In [None]:
# pyspark installation
!pip install pyspark

In [182]:
# first we have to initialize pyspark before doing anything
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Fraud Detection Pipeline") \
    .getOrCreate()

fileDataset = spark.read.csv("fraud_detection.csv", header=True, inferSchema=True)

# print data to make sure everything looks good
fileDataset.printSchema()
fileDataset.show(5)


root
 |-- months_as_customer: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- policy_number: integer (nullable = true)
 |-- policy_bind_date: date (nullable = true)
 |-- policy_state: string (nullable = true)
 |-- policy_csl: string (nullable = true)
 |-- policy_deductable: integer (nullable = true)
 |-- policy_annual_premium: double (nullable = true)
 |-- umbrella_limit: integer (nullable = true)
 |-- insured_zip: integer (nullable = true)
 |-- insured_sex: string (nullable = true)
 |-- insured_education_level: string (nullable = true)
 |-- insured_occupation: string (nullable = true)
 |-- insured_hobbies: string (nullable = true)
 |-- insured_relationship: string (nullable = true)
 |-- capital-gains: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- incident_date: date (nullable = true)
 |-- incident_type: string (nullable = true)
 |-- collision_type: string (nullable = true)
 |-- incident_severity: string (nullable = true)
 |-- authoritie

In [183]:
# all necessary imports
from pyspark.sql.functions import col, when, isnan, mean
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, unix_timestamp

In [184]:
# Here we convert the policy_bind_date and incident_date to the correct date format we use here
fileDataset = fileDataset.withColumn("policy_bind_date", to_date(col("policy_bind_date"), "yyyy-MM-dd"))
fileDataset = fileDataset.withColumn("incident_date", to_date(col("incident_date"), "yyyy-MM-dd"))

In [207]:
# here we have to make sure to show any rows that failed to convert and if they do they are set to null to avoid any furhter problems and print them
wrongDates = fileDataset.filter(col("policy_bind_date").isNull() | col("incident_date").isNull())
print("These are the rows with invalid dates:")
wrongDates.show()

These are the rows with invalid dates:
+------------------+---+-------------+----------------+------------+----------+-----------------+---------------------+--------------+-----------+-----------+-----------------------+------------------+---------------+--------------------+-------------+------------+-------------+-------------+--------------+-----------------+---------------------+--------------+-------------+-----------------+------------------------+---------------------------+---------------+---------------+---------+-----------------------+------------------+------------+--------------+-------------+---------+----------+---------+--------------+-----------------------+
|months_as_customer|age|policy_number|policy_bind_date|policy_state|policy_csl|policy_deductable|policy_annual_premium|umbrella_limit|insured_zip|insured_sex|insured_education_level|insured_occupation|insured_hobbies|insured_relationship|capital-gains|capital-loss|incident_date|incident_type|collision_type|inciden

In [186]:
# here we calculate the difference in days between policy_bind_date and incident_date and it is stored in a new column named as policy_to_indicident_days
fileDataset = fileDataset.withColumn("policy_to_incident_days", datediff(col("incident_date"), col("policy_bind_date")))
#print the update then display 5 rows of the new column
fileDataset.printSchema()
fileDataset.select("policy_to_incident_days").show(5)

root
 |-- months_as_customer: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- policy_number: integer (nullable = true)
 |-- policy_bind_date: date (nullable = true)
 |-- policy_state: string (nullable = true)
 |-- policy_csl: string (nullable = true)
 |-- policy_deductable: integer (nullable = true)
 |-- policy_annual_premium: double (nullable = true)
 |-- umbrella_limit: integer (nullable = true)
 |-- insured_zip: integer (nullable = true)
 |-- insured_sex: string (nullable = true)
 |-- insured_education_level: string (nullable = true)
 |-- insured_occupation: string (nullable = true)
 |-- insured_hobbies: string (nullable = true)
 |-- insured_relationship: string (nullable = true)
 |-- capital-gains: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- incident_date: date (nullable = true)
 |-- incident_type: string (nullable = true)
 |-- collision_type: string (nullable = true)
 |-- incident_severity: string (nullable = true)
 |-- authoritie

In [201]:
# there we many issues caused with negative values, null, nan so we try to rid of that issue
# these are the columns where they include numerical values
numValColumn = ["months_as_customer", "age", "policy_deductable", "policy_annual_premium", 
                     "umbrella_limit", "number_of_vehicles_involved", "bodily_injuries", "witnesses",
                     "total_claim_amount", "injury_claim", "property_claim", "vehicle_claim", 
                     "policy_to_incident_days"]

# here we are iterating thru each numerical column and check for "invalid" data
for column in numValColumn:
    print(f"Let's check the column: {column}")
    fileDataset.select(
        col(column),
        isnan(column).alias("nanVal"),
        col(column).isNull().alias("nullVal"),
        (col(column) == float("inf")).alias("infVal"),
        (col(column) == -float("inf")).alias("negInfVal")
    ).filter("nanVal OR nullVal OR infVal OR negInfVal").show()

Let's check this column: months_as_customer
+------------------+------+-------+------+---------+
|months_as_customer|nanVal|nullVal|infVal|negInfVal|
+------------------+------+-------+------+---------+
+------------------+------+-------+------+---------+

Let's check this column: age
+---+------+-------+------+---------+
|age|nanVal|nullVal|infVal|negInfVal|
+---+------+-------+------+---------+
+---+------+-------+------+---------+

Let's check this column: policy_deductable
+-----------------+------+-------+------+---------+
|policy_deductable|nanVal|nullVal|infVal|negInfVal|
+-----------------+------+-------+------+---------+
+-----------------+------+-------+------+---------+

Let's check this column: policy_annual_premium
+---------------------+------+-------+------+---------+
|policy_annual_premium|nanVal|nullVal|infVal|negInfVal|
+---------------------+------+-------+------+---------+
+---------------------+------+-------+------+---------+

Let's check this column: umbrella_lim

In [204]:
#here we first identify the numerical values in the columans
numValColumn = [field.name for field in fileDataset.schema.fields if field.dataType.simpleString() in ["int", "double", "float"]]

# here we are iterating through all numerical columns to clean 
for column in numValColumn:
    fileDataset = fileDataset.withColumn(
        column,
        when((col(column).isNull()) | (isnan(col(column))) | (col(column) < 0) |
             (col(column) == float("inf")) | (col(column) == -float("inf")), 0)
        .otherwise(col(column))
    )
    
# here we are identifying the categorical columns aka columns with words or letters
catValColumns = [field.name for field in fileDataset.schema.fields if field.dataType.simpleString() == "string"]

# here we are replacing categorical columns with a place holder
for column in catValColumns:
    fileDataset = fileDataset.fillna("Unknown", subset=[column])

In [206]:
# Here we first verify
print("First we the verify the numValColumsn")

# then we loop through all numerical columns to confirm for any invalid values
for column in numValColumn:
    print(f"Let's check the column: {column}")
    fileDataset.select(
        col(column),
        isnan(col(column)).alias("nanVal"),
        col(column).isNull().alias("nullVal"),
        (col(column) < 0).alias("negVal")
    ).filter("nanVal OR nullVal OR negVal").show()

# Doing the same thing as above, verify then loop thru categorical now to confirm no null values
print("Second we verify the catValColumns")
for column in catValColumns:
    print(f"Let's check the column: {column}")
    fileDataset.select(
        col(column)
    ).filter(col(column).isNull()).show()


First we the verify the numValColumsn
Let's check the column: months_as_customer
+------------------+------+-------+------+
|months_as_customer|nanVal|nullVal|negVal|
+------------------+------+-------+------+
+------------------+------+-------+------+

Let's check the column: age
+---+------+-------+------+
|age|nanVal|nullVal|negVal|
+---+------+-------+------+
+---+------+-------+------+

Let's check the column: policy_number
+-------------+------+-------+------+
|policy_number|nanVal|nullVal|negVal|
+-------------+------+-------+------+
+-------------+------+-------+------+

Let's check the column: policy_deductable
+-----------------+------+-------+------+
|policy_deductable|nanVal|nullVal|negVal|
+-----------------+------+-------+------+
+-----------------+------+-------+------+

Let's check the column: policy_annual_premium
+---------------------+------+-------+------+
|policy_annual_premium|nanVal|nullVal|negVal|
+---------------------+------+-------+------+
+------------------

In [191]:
#Naive Beyes Model

In [209]:
# here is the list of categorical columsm
catValColumns = ['policy_state', 'policy_csl', 'insured_sex', 'insured_education_level',
                       'insured_occupation', 'insured_hobbies', 'insured_relationship',
                       'incident_type', 'collision_type', 'incident_severity', 'authorities_contacted',
                       'incident_state', 'incident_city', 'auto_make', 'auto_model']

# here we are doing string indexing and one hot encoder for above columns 
# string indexing essentially converts string based categorical features into numerical index
# then the one hot encoder converts the indicis into vectors
catToNum = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="skip") for col in catValColumns]
indToVec = [OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_vec") for col in catValColumns]

# here we are simply combining multiple columns which includes both muerical and encoded categorical features into a single vector
assembler = VectorAssembler(
    inputCols=[f"{col}_vec" for col in catValColumns] + 
              ["months_as_customer", "age", "policy_deductable", "policy_annual_premium", 
               "umbrella_limit", "number_of_vehicles_involved", "bodily_injuries", "witnesses", 
               "total_claim_amount", "injury_claim", "property_claim", "vehicle_claim", 
               "policy_to_incident_days"],
    outputCol="features"
)

In [210]:
#here we are converting fraud_reported column into a numerical label we have to do this for naive bayes
indexLab = StringIndexer(inputCol="fraud_reported", outputCol="label", handleInvalid="skip")

# Here we are essentially defining naive bayes classifier and initialize
nb = NaiveBayes(featuresCol="features", labelCol="label", predictionCol="prediction")

# here the pipeline will consist of string indexing of categorical columns, one hot endocing of the same, vector assemble for feautre creation
# target variable index. Basically this is neeeded to train the model
pipeline = Pipeline(stages=catToNum + indToVec + [assembler, indexLab
                                                  , nb])

#here we are doing data split to be optimal we decided for a 70% and 30% split for training and testing respectively
DataForTraining, DataForTesting = fileDataset.randomSplit([0.7, 0.3], seed=42)

# here we are training the model by applying all transformatioms
model = pipeline.fit(DataForTraining)

# now after training, we are using it to make predictions
predictions = model.transform(DataForTesting)

# here we are doing model evaluation using the metrics AOC RUC score, and accuracy
AUCScore = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC")
AccScore = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

#Definining them
auc = AUCScore.evaluate(predictions)
accuracy = AccScore.evaluate(predictions)

# here we just simply display the scores of each
print(f"Naive Bayes Model's AUC ROC: {auc:.4f}")
print(f"Naive Bayes Model's Accuracy: {accuracy:.4f}")

# here we are displaying the first 10 rows of predictions
predictions.select("label", "prediction", "probability", "features").show(10)

Naive Bayes Model's AUC ROC: 0.5466
Naive Bayes Model's Accuracy: 0.6953
+-----+----------+-----------+--------------------+
|label|prediction|probability|            features|
+-----+----------+-----------+--------------------+
|  0.0|       0.0|  [1.0,0.0]|(137,[1,2,4,8,14,...|
|  1.0|       1.0|  [0.0,1.0]|(137,[1,3,4,5,16,...|
|  0.0|       1.0|  [0.0,1.0]|(137,[1,2,8,17,32...|
|  1.0|       0.0|  [1.0,0.0]|(137,[0,4,13,45,4...|
|  0.0|       0.0|  [1.0,0.0]|(137,[0,2,4,8,11,...|
|  0.0|       0.0|  [1.0,0.0]|(137,[6,12,30,47,...|
|  0.0|       0.0|  [1.0,0.0]|(137,[8,34,44,48,...|
|  0.0|       0.0|  [1.0,0.0]|(137,[1,3,4,8,12,...|
|  1.0|       0.0|  [1.0,0.0]|(137,[3,9,19,36,4...|
|  1.0|       0.0|  [1.0,0.0]|(137,[0,4,9,11,24...|
+-----+----------+-----------+--------------------+
only showing top 10 rows



In [211]:
# here we are using the predictions to determine another set of metrics to judge the model
Pred = predictions.select("prediction", "label").rdd.map(lambda row: (float(row.prediction), float(row.label)))
metrics = MulticlassMetrics(Pred)

# here we are displaying the metrics of confusion matrix, precision, recall, f-1 score
print("Confusion Matrix:\n", metrics.confusionMatrix().toArray())
print("Precision (Fraud):", metrics.precision(1.0))  
print("Recall (Fraud):", metrics.recall(1.0))        
print("F1-Score (Fraud):", metrics.fMeasure(1.0))    


Confusion Matrix:
 [[162.  34.]
 [ 44.  16.]]
Precision (Fraud): 0.32
Recall (Fraud): 0.26666666666666666
F1-Score (Fraud): 0.2909090909090909


In [212]:
#Random Forest Model

In [213]:
# Now we are doing The Random Forest Model in Pyspark
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# here, we are using input feature vector column, target column, decision trees, and seed this all needs to done for the Random Forest Model
RandomForestClass = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50, seed=42)

# Here we are creating a new pipeline for the random forest model but we are reusing all he existing preprocessing steps that we did for naive bayes
RandomForestPipeline = Pipeline(stages=catToNum + indToVec + [assembler, indexLab, RandomForestClass])

# here we are doing a data split, 70% to 30%, training and testing as we did previously as well
DataForTraining, DataForTesting = fileDataset.randomSplit([0.7, 0.3], seed=42)

# here we train the random forerst model
RandomForestModel = RandomForestPipeline.fit(DataForTraining)

# here now after training we will use it to make predictions
RandomForestPredictions = RandomForestModel.transform(DataForTesting)

# here now we are evaluating the model using the same metrics as earlier
AUCScore = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC")
AccScore = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# define the metrics
rf_auc = AUCScore.evaluate(RandomForestPredictions)
rf_accuracy = AccScore.evaluate(RandomForestPredictions)

# print them
print(f"Random Forest Model's AUC ROC: {rf_auc:.4f}")
print(f"Random Forest Model's Accuracy: {rf_accuracy:.4f}")

# here we are making a confusion matrix, doing the same thing we did previously then display them
Pred = RandomForestPredictions.select("prediction", "label").rdd.map(lambda row: (float(row.prediction), float(row.label)))
RandomForestMetrics = MulticlassMetrics(Pred)

print("Confusion Matrix:\n", RandomForestMetrics.confusionMatrix().toArray())
print("Precision (Fraud):", RandomForestMetrics.precision(1.0))
print("Recall (Fraud):", RandomForestMetrics.recall(1.0))
print("F1-Score (Fraud):", RandomForestMetrics.fMeasure(1.0))

# here we will display the predictions of the first 10
RandomForestPredictions.select("label", "prediction", "probability", "features").show(10)


Random Forest Model's AUC ROC: 0.8307
Random Forest Model's Accuracy: 0.7578
Confusion Matrix:
 [[183.  13.]
 [ 49.  11.]]
Precision (Fraud): 0.4583333333333333
Recall (Fraud): 0.18333333333333332
F1-Score (Fraud): 0.2619047619047619
+-----+----------+--------------------+--------------------+
|label|prediction|         probability|            features|
+-----+----------+--------------------+--------------------+
|  0.0|       0.0|[0.86760247831792...|(137,[1,2,4,8,14,...|
|  1.0|       0.0|[0.65530721026455...|(137,[1,3,4,5,16,...|
|  0.0|       0.0|[0.89299103755523...|(137,[1,2,8,17,32...|
|  1.0|       0.0|[0.54450080268872...|(137,[0,4,13,45,4...|
|  0.0|       0.0|[0.91874245954417...|(137,[0,2,4,8,11,...|
|  0.0|       0.0|[0.81799600587318...|(137,[6,12,30,47,...|
|  0.0|       1.0|[0.47654684646596...|(137,[8,34,44,48,...|
|  0.0|       0.0|[0.84467320584047...|(137,[1,3,4,8,12,...|
|  1.0|       0.0|[0.63009176206628...|(137,[3,9,19,36,4...|
|  1.0|       1.0|[0.4895443638094

In [None]:
XGBoosting Model


In [214]:
# Now we will work on the XGBoosting model first we import these
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# here we will define the model same thing as above
XGBoostingClass = GBTClassifier(featuresCol="features", labelCol="label", maxIter=50, maxDepth=5, seed=42)

# here we will be creating a new pipeline for the XGBoosting model
XGBoostinPipeline = Pipeline(stages=catToNum + indToVec + [assembler, indexLab, XGBoostingClass])

# here we are doing a data split, 70% to 30%, training and testing as we did previously as well
DataForTraining, DataForTesting = fileDataset.randomSplit([0.7, 0.3], seed=42)

# here we will be training the XGBoosting model
XGBoostingModel = XGBoostinPipeline.fit(DataForTraining)

# here now we will be using it to make predictions
XGBoostingPredictions = XGBoostingModel.transform(DataForTesting)

# here we will evaluating the model with the same metrics we used above
AUCScore = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC")
AccScore = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# define the metrics then display them
XGBoosingAUC = AUCScore.evaluate(XGBoostingPredictions)
XGBoostingAcc = AccScore.evaluate(XGBoostingPredictions)

print(f"Gradient Boosted Tree Model AUC: {XGBoosingAUC:.4f}")
print(f"Gradient Boosted Tree Model Accuracy: {XGBoostingAcc:.4f}")

# here we are doing a confusion matrix just like we did above 
XGBoosingPred = XGBoostingPredictions.select("prediction", "label").rdd.map(lambda row: (float(row.prediction), float(row.label)))
XGBoostingMetrics = MulticlassMetrics(XGBoosingPred)

print("Confusion Matrix:\n", XGBoostingMetrics.confusionMatrix().toArray())
print("Precision (Fraud):", XGBoostingMetrics.precision(1.0))
print("Recall (Fraud):", XGBoostingMetrics.recall(1.0))
print("F1-Score (Fraud):", XGBoostingMetrics.fMeasure(1.0))

# here we will display the predictions of the first 10
XGBoostingPredictions.select("label", "prediction", "probability", "features").show(10)


Gradient Boosted Tree Model AUC: 0.8213
Gradient Boosted Tree Model Accuracy: 0.8242
Confusion Matrix:
 [[166.  30.]
 [ 15.  45.]]
Precision (Fraud): 0.6
Recall (Fraud): 0.75
F1-Score (Fraud): 0.6666666666666665
+-----+----------+--------------------+--------------------+
|label|prediction|         probability|            features|
+-----+----------+--------------------+--------------------+
|  0.0|       0.0|[0.95024433099390...|(137,[1,2,4,8,14,...|
|  1.0|       0.0|[0.98305639437189...|(137,[1,3,4,5,16,...|
|  0.0|       0.0|[0.95811746557606...|(137,[1,2,8,17,32...|
|  1.0|       1.0|[0.40127673857769...|(137,[0,4,13,45,4...|
|  0.0|       0.0|[0.95658065827562...|(137,[0,2,4,8,11,...|
|  0.0|       0.0|[0.92782431679876...|(137,[6,12,30,47,...|
|  0.0|       1.0|[0.37390401431447...|(137,[8,34,44,48,...|
|  0.0|       0.0|[0.95259013828628...|(137,[1,3,4,8,12,...|
|  1.0|       1.0|[0.29109246202187...|(137,[3,9,19,36,4...|
|  1.0|       1.0|[0.10320104804378...|(137,[0,4,9,11,24