In [1]:
# Install PySpark and import necessary libraries
!pip install pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

import warnings
# Filter out DeprecationWarnings
warnings.filterwarnings("ignore", category=DeprecationWarning)


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName('fraud_detection').getOrCreate()


In [3]:
# Load the fraud detection dataset as a Spark dataframe
df = spark.read.csv("./fraud_detection_dataset.csv", inferSchema=True, header=True)

# Print the schema and sample data
df.printSchema()
df.show(2)


root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|186

In [4]:
# get the number of rows in the DataFrame
row_count = df.count()

# print the number of rows
print("Number of rows: ", row_count)

Number of rows:  3127948


In [5]:
# Select the relevant columns for the analysis
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

# Print the schema and sample data after selecting relevant columns
df.show(2)
df.printSchema()


+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows

root
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- isFraud: integer (nullable = true)



In [6]:
# Check for missing values and drop rows with missing values
data_agg = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])
data_agg.show()
df = df.dropna()
data_agg = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])
data_agg.show()


+----+------+-------------+--------------+-------+
|type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+----+------+-------------+--------------+-------+
|   1|     1|            1|             1|      1|
+----+------+-------------+--------------+-------+

+----+------+-------------+--------------+-------+
|type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+----+------+-------------+--------------+-------+
|   0|     0|            0|             0|      0|
+----+------+-------------+--------------+-------+



In [7]:
# Group the data by transaction type to understand the distribution of transactions
df.groupBy('type').count().show()


+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 258994|
| CASH_IN| 686608|
|CASH_OUT|1116607|
| PAYMENT|1046141|
|   DEBIT|  19597|
+--------+-------+



In [8]:
# Split the data into training and testing sets
train, test = df.randomSplit([0.7, 0.3], seed=40)
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")


Train set length: 2189829 records
Test set length: 938118 records


In [9]:
# Print the first two rows of the training set and the data types of each column
train.show(2)
train.dtypes

# Identify categorical and numerical columns
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [x for (x, dataType) in train.dtypes if (dataType == "double")]

# Print the identified categorical and numerical columns
print(numCols)
print(catCols)

# Check the number of distinct transaction types in the training set
train.agg(F.countDistinct("type")).show()
# Group the training set by transaction type to understand the distribution of transactions
train.groupBy("type").count().show()


+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']
+-----------+
|count(type)|
+-----------+
|          5|
+-----------+

+--------+------+
|    type| count|
+--------+------+
|TRANSFER|181041|
| CASH_IN|480391|
|CASH_OUT|782897|
| PAYMENT|731771|
|   DEBIT| 13729|
+--------+------+



In [10]:
# Use StringIndexer and OneHotEncoder to transform categorical columns to numerical values
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [11]:

string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]
string_indexe=string_indexer[0].fit(df).transform(df)
string_indexe.show()


+--------+---------+-------------+--------------+-------+------------------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|
+--------+---------+-------------+--------------+-------+------------------+
| PAYMENT|  9839.64|     170136.0|     160296.36|      0|               1.0|
| PAYMENT|  1864.28|      21249.0|      19384.72|      0|               1.0|
|TRANSFER|    181.0|        181.0|           0.0|      1|               3.0|
|CASH_OUT|    181.0|        181.0|           0.0|      1|               0.0|
| PAYMENT| 11668.14|      41554.0|      29885.86|      0|               1.0|
| PAYMENT|  7817.71|      53860.0|      46042.29|      0|               1.0|
| PAYMENT|  7107.77|     183195.0|     176087.23|      0|               1.0|
| PAYMENT|  7861.64|    176087.23|     168225.59|      0|               1.0|
| PAYMENT|  4024.36|       2671.0|           0.0|      0|               1.0|
|   DEBIT|  5337.77|      41720.0|      36382.23|      0|               4.0|

In [12]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]
one_hot_encoder_df=one_hot_encoder[0].fit(string_indexe).transform(string_indexe)
one_hot_encoder_df.show()

+--------+---------+-------------+--------------+-------+------------------+------------------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|
+--------+---------+-------------+--------------+-------+------------------+------------------+
| PAYMENT|  9839.64|     170136.0|     160296.36|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  1864.28|      21249.0|      19384.72|      0|               1.0|     (4,[1],[1.0])|
|TRANSFER|    181.0|        181.0|           0.0|      1|               3.0|     (4,[3],[1.0])|
|CASH_OUT|    181.0|        181.0|           0.0|      1|               0.0|     (4,[0],[1.0])|
| PAYMENT| 11668.14|      41554.0|      29885.86|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7817.71|      53860.0|      46042.29|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7107.77|     183195.0|     176087.23|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7861.64|    176087.23|     1

In [13]:

# Use VectorAssembler to combine numerical and categorical columns into a feature vector
from pyspark.ml.feature import VectorAssembler

assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]
stages


[StringIndexer_488712a55d6e,
 OneHotEncoder_bc4b162c08e6,
 VectorAssembler_25c15d78ec48]

In [14]:
# Create a pipeline to execute the transformation stages in a specific order
from pyspark.ml import Pipeline
pipeline = Pipeline().setStages(stages)


In [15]:
# Define a pipeline and fit it to the training data
model = pipeline.fit(train)
# Transform the training data using the pipeline
pp_df_train = model.transform(train)
pp_df_train.show()

# Select relevant features and labels from the transformed training data
data_train = pp_df_train.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)
data_train.show()


+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|VectorAssembler_features|
+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|               2.0|     (4,[2],[1.0])|    [1.42,1270713.41,...|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|               2.0|     (4,[2],[1.0])|    [4.35,4136277.22,...|
|CASH_IN|  4.58|      94241.0|      94245.58|      0|               2.0|     (4,[2],[1.0])|    [4.58,94241.0,942...|
|CASH_IN|  4.71|      50198.0|      50202.71|      0|               2.0|     (4,[2],[1.0])|    [4.71,50198.0,502...|
|CASH_IN|  5.19|      18104.0|      18109.19|      0|               2.0|     (4,[2],[1.0])|    [5.19,18104.0,181...|
|CASH_IN|  5.44|          0.0|          5.44|      0|           

In [16]:
# Fit the pipeline to the test data and transform it
model = pipeline.fit(test)
pp_df_test = model.transform(test)
pp_df_test.show()

# Select relevant features and labels from the transformed test data
data_test = pp_df_test.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)
data_test.show(5, truncate=False)


+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|VectorAssembler_features|
+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|CASH_IN|  6.76|      11322.0|      11328.76|      0|               2.0|     (4,[2],[1.0])|    [6.76,11322.0,113...|
|CASH_IN|  8.29|      20392.0|      20400.29|      0|               2.0|     (4,[2],[1.0])|    [8.29,20392.0,204...|
|CASH_IN|  9.02|   2416260.59|    2416269.61|      0|               2.0|     (4,[2],[1.0])|    [9.02,2416260.59,...|
|CASH_IN| 13.86|   6868100.18|    6868114.04|      0|               2.0|     (4,[2],[1.0])|    [13.86,6868100.18...|
|CASH_IN| 14.36|    613030.46|     613044.82|      0|               2.0|     (4,[2],[1.0])|    [14.36,613030.46,...|
|CASH_IN|  14.4|1.143460813E7| 1.143462253E7|      0|           

In [17]:
# Evaluate the logistic regression model using a binary classification evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
evaluator.setLabelCol("label")

BinaryClassificationEvaluator_0d8eeb7e3bc3

In [18]:
from pyspark.sql.functions import col

def compute_accuracy(predictions):
  
    correct_predictions = predictions.filter(col('prediction') == col('label')).count()
    total_predictions = predictions.count()
    accuracy = correct_predictions / total_predictions
    return accuracy

In [19]:
# Train a decision tree classifier on the training data
from pyspark.ml.classification import DecisionTreeClassifier
model = DecisionTreeClassifier(maxDepth=5).fit(data_train)

# Use the trained decision tree classifier to make predictions on the test data
data_pred = model.transform(data_test)

# Evaluate the decision tree classifier using a binary classification evaluator
eval_results = evaluator.evaluate(data_pred)
print("Evaluation results: ",eval_results)

accuracy = compute_accuracy(data_pred)
print("Accuracy: {}%".format(accuracy * 100))

Evaluation results:  0.2147684242473541
Accuracy: 99.93924005295709%


In [20]:
# Train a random forest classifier on the training data
from pyspark.ml.classification import RandomForestClassifier
model = RandomForestClassifier(maxDepth=5).fit(data_train)

# Use the trained random forest classifier to make predictions on the test data
data_pred=model.transform(data_test)

# Evaluate the random forest classifier using a binary classification evaluator
eval_results = evaluator.evaluate(data_pred)
print("Evaluation results: ",eval_results)

accuracy = compute_accuracy(data_pred)
print("Accuracy: {}%".format(accuracy * 100))

Evaluation results:  0.963044325089312
Accuracy: 99.93689493219404%


In [21]:
# Train a Naive Bayes classifier on the training data
from pyspark.ml.classification import NaiveBayes
model = NaiveBayes().fit(data_train)

# Use the trained Naive Bayes classifier to make predictions on the test data
data_pred=model.transform(data_test)

# Evaluate the Naive Bayes classifier using a binary classification evaluator
eval_results = evaluator.evaluate(data_pred)
print("Evaluation results: ",eval_results)

accuracy = compute_accuracy(data_pred)
print("Accuracy: {}%".format(accuracy * 100))

Evaluation results:  0.35213388110452065
Accuracy: 40.76704636303749%


In [22]:
# Train a logistic regression model on the training data
from pyspark.ml.classification import LogisticRegression
model = LogisticRegression().fit(data_train)

# Use the trained logistic regression model to make predictions on the test data
data_pred=model.transform(data_test)
# Evaluate the logistic regression using a binary classification evaluator
eval_results = evaluator.evaluate(data_pred)
print("Evaluation results: ",eval_results)

accuracy = compute_accuracy(data_pred)
print("Accuracy: {}%".format(accuracy * 100))

Evaluation results:  0.9921360640216541
Accuracy: 99.94297092689833%


In [23]:
# Generate a confusion matrix
from pyspark.mllib.evaluation import MulticlassMetrics

predictionsAndLabels = data_pred.select(['prediction', 'label']).rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = MulticlassMetrics(predictionsAndLabels)

print('Confusion matrix:')
print(metrics.confusionMatrix().toArray())



Confusion matrix:
[[9.37276e+05 5.80000e+01]
 [4.77000e+02 3.07000e+02]]
