In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, monotonically_increasing_id, lit, date_add, explode
import numpy as np
import matplotlib.pyplot as plt
import warnings
import plotly.express as px

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Project Big Data") \
    .master("spark://192.168.1.16:7077") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "1536m") \
    .getOrCreate()

print("SparkSession initialized successfully!")

SparkSession initialized successfully!


In [3]:
print(spark.conf.get("spark.master"))

spark://192.168.1.16:7077


In [4]:
import time
from pyspark.sql.types import (
    StructType, StructField, IntegerType, StringType, DoubleType
)

# Measure the time taken to read the CSV file for performance evaluation
start_time = time.time()

# Read the CSV file with optimized options:
# - header: the file has a header row
# - inferSchema: false (since we have already defined the schema)
ps_classification_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("hdfs://192.168.1.16:9000/data/PS_20174392719_1491204439457_log.csv")

# Increase the number of partitions to optimize parallel processing
# Adjust the number of partitions based on the number of cores/workers in your cluster
ps_classification_df = ps_classification_df.repartition(24)

elapsed_time = time.time() - start_time
print("Time to read CSV file: {:.2f} seconds".format(elapsed_time))

Time to read CSV file: 31.17 seconds


In [5]:
ps_classification_df.show(5)

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
| 212|TRANSFER|899732.34|C1397711092|          0.0|           0.0|C1158899502|    4052724.59|    4952456.93|      0|             0|
| 230|CASH_OUT|103761.13| C231781098|      61707.0|           0.0| C548992066|    1062619.33|    1166380.46|      0|             0|
| 230| PAYMENT| 16987.22| C712183135|          0.0|           0.0| M431352104|           0.0|           0.0|      0|             0|
| 212| PAYMENT|  5824.95| C829118865|          0.0|           0.0| M953100742|           0.0|           0.0|      0|             0|
| 232|CASH_OUT| 79267.42| C519353403|        222.0|           0.0| C57639466

## EDA

### Schema of the data

In [6]:
ps_classification_df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [7]:
from pyspark.sql.functions import col, round

summary_df = ps_classification_df.summary()
summary_df = summary_df.select(
    col("summary"),
    round(col("step"), 2).alias("step"),
    round(col("amount"), 2).alias("amount"),
    round(col("oldbalanceOrg"), 2).alias("oldbalanceOrg"),
    round(col("newbalanceOrig"), 2).alias("newbalanceOrig"),
    col("isFraud")
)
summary_df.show(truncate=False)

+-------+---------+-------------+-------------+--------------+--------------------+
|summary|step     |amount       |oldbalanceOrg|newbalanceOrig|isFraud             |
+-------+---------+-------------+-------------+--------------+--------------------+
|count  |6362620.0|6362620.0    |6362620.0    |6362620.0     |6362620             |
|mean   |243.4    |179861.9     |833883.1     |855113.67     |0.001290820448180152|
|stddev |142.33   |603858.23    |2888242.67   |2924048.5     |0.035904796801604175|
|min    |1.0      |0.0          |0.0          |0.0           |0                   |
|25%    |156.0    |13388.35     |0.0          |0.0           |0                   |
|50%    |239.0    |74852.61     |14204.16     |0.0           |0                   |
|75%    |335.0    |208695.82    |107303.66    |144207.43     |0                   |
|max    |743.0    |9.244551664E7|5.958504037E7|4.958504037E7 |1                   |
+-------+---------+-------------+-------------+--------------+--------------

In [8]:
duplicate_count = ps_classification_df.groupBy(ps_classification_df.columns).count().where("count > 1").select(count("*")).collect()[0][0]
print(f"Number of duplicate rows: {duplicate_count}")

ps_classification_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in ps_classification_df.columns]).show()

Number of duplicate rows: 0
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



In [9]:
ps_classification_df = (
    ps_classification_df.withColumnRenamed('nameOrig', 'origin')
       .withColumnRenamed('oldbalanceOrg', 'sender_old_balance')
       .withColumnRenamed('newbalanceOrig', 'sender_new_balance')
       .withColumnRenamed('nameDest', 'destination')
       .withColumnRenamed('oldbalanceDest', 'receiver_old_balance')
       .withColumnRenamed('newbalanceDest', 'receiver_new_balance')
       .withColumnRenamed('isFraud', 'isfraud')
)
ps_classification_df.columns

['step',
 'type',
 'amount',
 'origin',
 'sender_old_balance',
 'sender_new_balance',
 'destination',
 'receiver_old_balance',
 'receiver_new_balance',
 'isfraud',
 'isFlaggedFraud']

In [10]:
# drop column isFlaggedFraud

ps_classification_df = ps_classification_df.drop('isFlaggedFraud')

In [11]:
ps_classification_df.groupBy("isfraud", "type").count().orderBy("isfraud", "type").show()

+-------+--------+-------+
|isfraud|    type|  count|
+-------+--------+-------+
|      0| CASH_IN|1399284|
|      0|CASH_OUT|2233384|
|      0|   DEBIT|  41432|
|      0| PAYMENT|2151495|
|      0|TRANSFER| 528812|
|      1|CASH_OUT|   4116|
|      1|TRANSFER|   4097|
+-------+--------+-------+



## Visualize the data

In [12]:
ps_classification_df = ps_classification_df.withColumn(
    "type2",
    when((col("origin").contains("C")) & (col("destination").contains("C")), "CC")
    .when((col("origin").contains("C")) & (col("destination").contains("M")), "CM")
    .when((col("origin").contains("M")) & (col("destination").contains("C")), "MC")
    .when((col("origin").contains("M")) & (col("destination").contains("M")), "MM")
    .otherwise(None)
)
ps_classification_df.show(5)

+----+--------+---------+-----------+------------------+------------------+-----------+--------------------+--------------------+-------+-----+
|step|    type|   amount|     origin|sender_old_balance|sender_new_balance|destination|receiver_old_balance|receiver_new_balance|isfraud|type2|
+----+--------+---------+-----------+------------------+------------------+-----------+--------------------+--------------------+-------+-----+
| 134|CASH_OUT|139235.45|C1357341371|               0.0|               0.0|C1721599816|           559616.15|            698851.6|      0|   CC|
| 133|CASH_OUT|119530.71|C1486137070|               0.0|               0.0| C322662467|          3887841.04|          4082378.64|      0|   CC|
| 139| PAYMENT|  8699.19|C1103357114|               0.0|               0.0|M1379888655|                 0.0|                 0.0|      0|   CM|
| 138| CASH_IN| 33778.44|C1794878395|           29922.0|          63700.44|C1460662635|                 0.0|                 0.0|      0

In [13]:
fraud_trans = ps_classification_df.filter(col("isfraud") == 1)
valid_trans = ps_classification_df.filter(col("isfraud") == 0)

# Count occurrences of each type2 category for fraud transactions
print("Number of fraud transactions according to type are below:")
fraud_trans.groupBy("type2").agg(count("*").alias("count")).orderBy(col("count").desc()).show()

# Count occurrences of each type2 category for valid transactions
print("Number of valid transactions according to type are below:")
valid_trans.groupBy("type2").agg(count("*").alias("count")).orderBy(col("count").desc()).show()

Number of fraud transactions according to type are below:
+-----+-----+
|type2|count|
+-----+-----+
|   CC| 8213|
+-----+-----+

Number of valid transactions according to type are below:
+-----+-------+
|type2|  count|
+-----+-------+
|   CC|4202912|
|   CM|2151495|
+-----+-------+



In [14]:
# Drop 'origin' and 'destination' columns
ps_classification_df = ps_classification_df.drop("origin", "destination")

# Show the updated DataFrame
ps_classification_df.show()

+----+--------+---------+------------------+------------------+--------------------+--------------------+-------+-----+
|step|    type|   amount|sender_old_balance|sender_new_balance|receiver_old_balance|receiver_new_balance|isfraud|type2|
+----+--------+---------+------------------+------------------+--------------------+--------------------+-------+-----+
|  45|CASH_OUT| 45370.04|           20220.0|               0.0|            44798.09|            90168.13|      0|   CC|
|  43| PAYMENT| 17647.42|             353.0|               0.0|                 0.0|                 0.0|      0|   CM|
|  23| CASH_IN|169446.27|         837200.77|        1006647.03|           429690.03|           260243.76|      0|   CC|
|  44| PAYMENT| 14549.29|           50641.0|          36091.71|                 0.0|                 0.0|      0|   CM|
|  23| PAYMENT|  5780.19|           31373.0|          25592.81|                 0.0|                 0.0|      0|   CM|
|  44|TRANSFER|428879.21|            261

In [15]:
ps_classification_df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- sender_old_balance: double (nullable = true)
 |-- sender_new_balance: double (nullable = true)
 |-- receiver_old_balance: double (nullable = true)
 |-- receiver_new_balance: double (nullable = true)
 |-- isfraud: integer (nullable = true)
 |-- type2: string (nullable = true)



### Decision Tree Classifier

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

label_indexer = StringIndexer(inputCol="isfraud", outputCol="label")
ps_decision_tree_classifier_df = label_indexer.fit(ps_classification_df).transform(ps_classification_df)

# Define categorical and numerical columns
categorical_cols = ["type", "type2"]
numerical_cols = ["amount", "sender_old_balance", "sender_new_balance",
                  "receiver_old_balance", "receiver_new_balance"]

# Pipeline stages
stages = []

# StringIndexer and OneHotEncoder for categorical features
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"indexed_{col}")
    encoder = OneHotEncoder(inputCol=f"indexed_{col}", outputCol=f"encoded_{col}")
    stages.extend([indexer, encoder])

# Assemble all features into a single vector
assembler_inputs = [f"encoded_{col}" for col in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages.append(assembler)


In [17]:
# Decision Tree Classifier
dt_classifier = DecisionTreeClassifier(labelCol="isfraud", featuresCol="features", seed=42)
stages.append(dt_classifier)

# Create a Pipeline
pipeline = Pipeline(stages=stages)

# Split data into training and test sets (stratified split)
training, test = ps_decision_tree_classifier_df.randomSplit([0.8, 0.2], seed=42)

# Parameter grid for cross-validation (hyperparameter tuning)
param_grid = ParamGridBuilder() \
    .addGrid(dt_classifier.maxDepth, [1,2]) \
    .addGrid(dt_classifier.maxBins, [32, 64]) \
    .build()

# Train the cross-validated model (F1 as the optimization metric)
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="f1"),
    numFolds=3
)

cv_model = cv.fit(training)

# Make predictions on the test set
predictions = cv_model.transform(test)

# Define evaluators for different metrics
evaluators = {
    "F1-Score": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="f1"),
    "Accuracy": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="accuracy"),
    "Precision": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="weightedPrecision"),
    "Recall": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="weightedRecall")
}

# Compute and print all evaluation metrics
metrics = {metric_name: evaluator.evaluate(predictions) for metric_name, evaluator in evaluators.items()}
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")

# Return trained model and metrics
cv_model, metrics


F1-Score: 0.9988
Accuracy: 0.9991
Precision: 0.9990
Recall: 0.9991


(CrossValidatorModel_b3fa2f38f815,
 {'F1-Score': 0.9988391896438635,
  'Accuracy': 0.9990813561666226,
  'Precision': 0.9990189177158473,
  'Recall': 0.9990813561666227})

### Logistic Regression model

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

label_indexer = StringIndexer(inputCol="isfraud", outputCol="label")
ps_logistic_regression_df = label_indexer.fit(ps_classification_df).transform(ps_classification_df)

# Define categorical and numerical columns
categorical_cols = ["type", "type2"]
numerical_cols = ["amount", "sender_old_balance", "sender_new_balance",
                  "receiver_old_balance", "receiver_new_balance"]

# Pipeline stages
stages = []

# StringIndexer and OneHotEncoder for categorical features
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"indexed_{col}")
    encoder = OneHotEncoder(inputCol=f"indexed_{col}", outputCol=f"encoded_{col}")
    stages.extend([indexer, encoder])

# Assemble all features into a single vector
assembler_inputs = [f"encoded_{col}" for col in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages.append(assembler)


In [19]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Define Logistic Regression model
lr_classifier = LogisticRegression(labelCol="isfraud", featuresCol="features", maxIter=100, regParam=0.01)

# Add model to the pipeline
lr_pipeline = Pipeline(stages=stages + [lr_classifier])

# Split data into training and test sets
training, test = ps_logistic_regression_df.randomSplit([0.8, 0.2], seed=42)

# Hyperparameter tuning grid
lr_param_grid = ParamGridBuilder() \
    .addGrid(lr_classifier.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr_classifier.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Cross-validation
lr_cv = CrossValidator(
    estimator=lr_pipeline,
    estimatorParamMaps=lr_param_grid,
    evaluator=MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="f1"),
    numFolds=3
)

# Train model
lr_cv_model = lr_cv.fit(training)

# Make predictions
lr_predictions = lr_cv_model.transform(test)

# Evaluate metrics
lr_metrics = {
    "F1-Score": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="f1").evaluate(lr_predictions),
    "Accuracy": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="accuracy").evaluate(lr_predictions),
    "Precision": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="weightedPrecision").evaluate(lr_predictions),
    "Recall": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="weightedRecall").evaluate(lr_predictions),
}
# Print metrics
print("\n🔍 Logistic Regression Metrics:")
for metric, value in lr_metrics.items():
    print(f"{metric}: {value:.4f}")


🔍 Logistic Regression Metrics:
F1-Score: 0.9981
Accuracy: 0.9988
Precision: 0.9975
Recall: 0.9988


### Naïve Bayes model

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

label_indexer = StringIndexer(inputCol="isfraud", outputCol="label")
ps_naive_bayes_model_df = label_indexer.fit(ps_classification_df).transform(ps_classification_df)

# Define categorical and numerical columns
categorical_cols = ["type", "type2"]
numerical_cols = ["amount", "sender_old_balance", "sender_new_balance",
                  "receiver_old_balance", "receiver_new_balance"]

# Pipeline stages
stages = []

# StringIndexer and OneHotEncoder for categorical features
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"indexed_{col}")
    encoder = OneHotEncoder(inputCol=f"indexed_{col}", outputCol=f"encoded_{col}")
    stages.extend([indexer, encoder])

# Assemble all features into a single vector
assembler_inputs = [f"encoded_{col}" for col in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages.append(assembler)


In [21]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define Naïve Bayes model
nb_classifier = NaiveBayes(labelCol="isfraud", featuresCol="features", modelType="multinomial")

# Add model to the pipeline
nb_pipeline = Pipeline(stages=stages + [nb_classifier])

# Split data into training and test sets
training, test = ps_naive_bayes_model_df.randomSplit([0.8, 0.2], seed=42)

# Hyperparameter tuning grid
nb_param_grid = ParamGridBuilder() \
    .addGrid(nb_classifier.smoothing, [0.0, 1.0, 10.0]) \
    .build()

# Cross-validation
nb_cv = CrossValidator(
    estimator=nb_pipeline,
    estimatorParamMaps=nb_param_grid,
    evaluator=MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="f1"),
    numFolds=3
)

# Train model
nb_cv_model = nb_cv.fit(training)

# Make predictions
nb_predictions = nb_cv_model.transform(test)

# Evaluate metrics
nb_metrics = {
    "F1-Score": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="f1").evaluate(nb_predictions),
    "Accuracy": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="accuracy").evaluate(nb_predictions),
    "Precision": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="weightedPrecision").evaluate(nb_predictions),
    "Recall": MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="isfraud", metricName="weightedRecall").evaluate(nb_predictions),
}

# Print metrics
print("\n🔍 Naïve Bayes Metrics:")
for metric, value in nb_metrics.items():
    print(f"{metric}: {value:.4f}")



🔍 Naïve Bayes Metrics:
F1-Score: 0.9525
Accuracy: 0.9115
Precision: 0.9983
Recall: 0.9115


In [22]:
spark.stop()