TRANSACTION FRAUD DETECTION

In [None]:
!pip install pyspark



Import libraries

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, desc
from pyspark.sql.functions import min, max
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

Create Spark session

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').appName('Transaction fraud detection').getOrCreate()

Load the dataset

In [None]:
data = spark.read.csv('/content/drive/MyDrive/fraud_0.1origbase.csv',header=True,inferSchema=True)

In [None]:
data.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [None]:
data.describe().show()

+-------+-----------------+--------+-----------------+-----------+-----------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|             step|    type|           amount|   nameOrig|    oldbalanceOrg|   newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+-----------------+--------+-----------------+-----------+-----------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|  count|           636262|  636262|           636262|     636262|           636262|           636262|     636262|            636262|            636262|              636262|              636262|
|   mean|242.9319352719477|    NULL|180058.5194135685|       NULL|831793.6519234563|852835.3756044391|       NULL|1096212.2446619945|1221808.5280917739|0.001290348944302...|3.143359182223675...|
| stddev|142.330856751192

In [None]:
data.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

Actions

In [None]:
rdd1 = data.count()
print(f"Number of rows in data : {rdd1}")

Number of rows in data : 636262


In [None]:
data.show(10,False)

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|type    |amount   |nameOrig   |oldbalanceOrg|newbalanceOrig|nameDest   |oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|283 |CASH_IN |210329.84|C1159819632|3778062.79   |3988392.64    |C1218876138|1519266.6     |1308936.76    |0      |0             |
|132 |CASH_OUT|215489.19|C1372369468|21518.0      |0.0           |C467105520 |6345756.55    |6794954.89    |0      |0             |
|355 |DEBIT   |4431.05  |C1059822709|20674.0      |16242.95      |C76588246  |80876.56      |85307.61      |0      |0             |
|135 |CASH_OUT|214026.2 |C1464960643|46909.73     |0.0           |C1059379810|1.346745036E7 |1.368147656E7 |0      |0             |
|381 |CASH_OUT|8858.45  |C831134427 |0.0          |0.0           |C579876929

In [None]:
rdd2 =data.first()
print(f"The first element of dataset: {rdd2}")

The first element of dataset: Row(step=283, type='CASH_IN', amount=210329.84, nameOrig='C1159819632', oldbalanceOrg=3778062.79, newbalanceOrig=3988392.64, nameDest='C1218876138', oldbalanceDest=1519266.6, newbalanceDest=1308936.76, isFraud=0, isFlaggedFraud=0)


In [None]:
first_five_rows = data.take(5)
first_five_rows

[Row(step=283, type='CASH_IN', amount=210329.84, nameOrig='C1159819632', oldbalanceOrg=3778062.79, newbalanceOrig=3988392.64, nameDest='C1218876138', oldbalanceDest=1519266.6, newbalanceDest=1308936.76, isFraud=0, isFlaggedFraud=0),
 Row(step=132, type='CASH_OUT', amount=215489.19, nameOrig='C1372369468', oldbalanceOrg=21518.0, newbalanceOrig=0.0, nameDest='C467105520', oldbalanceDest=6345756.55, newbalanceDest=6794954.89, isFraud=0, isFlaggedFraud=0),
 Row(step=355, type='DEBIT', amount=4431.05, nameOrig='C1059822709', oldbalanceOrg=20674.0, newbalanceOrig=16242.95, nameDest='C76588246', oldbalanceDest=80876.56, newbalanceDest=85307.61, isFraud=0, isFlaggedFraud=0),
 Row(step=135, type='CASH_OUT', amount=214026.2, nameOrig='C1464960643', oldbalanceOrg=46909.73, newbalanceOrig=0.0, nameDest='C1059379810', oldbalanceDest=13467450.36, newbalanceDest=13681476.56, isFraud=0, isFlaggedFraud=0),
 Row(step=381, type='CASH_OUT', amount=8858.45, nameOrig='C831134427', oldbalanceOrg=0.0, newbala

Transformations

In [None]:
filtered_data = data.filter(data["isFraud"] == 1)
filtered_data.count()

821

In [None]:
sorted_data = data.orderBy("amount")
sorted_data.show()

+----+--------+------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
| 741|CASH_OUT|   0.0| C312737633|          0.0|           0.0|C1400061387|     267522.87|     267522.87|      1|             0|
| 646|CASH_OUT|   0.0|C2060908932|          0.0|           0.0|C1587892888|           0.0|           0.0|      1|             0|
| 300| PAYMENT|  0.17| C757209321|     276954.0|     276953.83|M2118236430|           0.0|           0.0|      0|             0|
| 277| PAYMENT|   0.3| C986345541|      80499.0|       80498.7| M122005350|           0.0|           0.0|      0|             0|
| 300| PAYMENT|  0.37| C494444343|          0.0|           0.0|M1722927281|           0.0|       

In [None]:
data = data.drop('nameOrig')

In [None]:
data.groupBy("type").count().show(truncate=False)

+--------+------+
|type    |count |
+--------+------+
|TRANSFER|53294 |
|CASH_IN |139614|
|CASH_OUT|224216|
|PAYMENT |214968|
|DEBIT   |4170  |
+--------+------+



Preprocessing

In [None]:
data.columns

['step',
 'type',
 'amount',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [None]:
print(f'Number of rows and column in dataframe: {data.toPandas().shape}')

Number of rows and column in dataframe: (636262, 10)


In [None]:
data = data.withColumn('diff_new_old_balance',col('newbalanceOrig')-col('oldbalanceOrg'))

In [None]:
data = data.withColumn('diff_new_old_destiny',col('newbalanceDest')-col('oldbalanceDest'))

In [None]:
featurecol = ['step','amount','oldbalanceOrg','newbalanceOrig','oldbalanceDest','newbalanceDest','diff_new_old_balance','diff_new_old_destiny']

Vector Assembler

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = featurecol,  outputCol = 'features')
output = assembler.transform(data)

Standardize the data

In [None]:
from pyspark.ml.feature import StandardScaler
scale=StandardScaler (inputCol='features', outputCol='standardized')
data_scale=scale.fit(output)
data_scale_output=data_scale.transform(output)
data_scale_output.show(10, False)

+----+--------+---------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------------+--------------------+--------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|step|type    |amount   |oldbalanceOrg|newbalanceOrig|nameDest   |oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|diff_new_old_balance|diff_new_old_destiny|features                                                                                          |standardized                                                                                                                                                         |
+----+--------+---------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------------+------

In [None]:
final_data = data_scale_output.select('features','isFraud')
final_data.show()

+--------------------+-------+
|            features|isFraud|
+--------------------+-------+
|[283.0,210329.84,...|      0|
|[132.0,215489.19,...|      0|
|[355.0,4431.05,20...|      0|
|[135.0,214026.2,4...|      0|
|[381.0,8858.45,0....|      0|
|[208.0,256440.86,...|      0|
|[347.0,120989.98,...|      0|
|[183.0,62655.01,1...|      0|
|[184.0,256745.11,...|      0|
|(8,[0,1,2,6],[12....|      0|
|(8,[0,1],[15.0,86...|      0|
|(8,[0,1],[186.0,1...|      0|
|[321.0,147708.56,...|      0|
|[691.0,41882.88,7...|      0|
|[239.0,749.39,0.0...|      0|
|[163.0,126511.11,...|      0|
|[350.0,230581.53,...|      0|
|[137.0,317575.58,...|      0|
|[188.0,87633.89,2...|      0|
|[159.0,254252.74,...|      0|
+--------------------+-------+
only showing top 20 rows



Splitting the data into train and test

In [None]:
train_data,test_data = final_data.randomSplit([0.7,0.3])
train_data.describe().show()
test_data.describe().show()

+-------+--------------------+
|summary|             isFraud|
+-------+--------------------+
|  count|              445491|
|   mean|0.001313157841572557|
| stddev| 0.03621375984146948|
|    min|                   0|
|    max|                   1|
+-------+--------------------+

+-------+--------------------+
|summary|             isFraud|
+-------+--------------------+
|  count|              190771|
|   mean|0.001237085301225029|
| stddev|  0.0351505533079575|
|    min|                   0|
|    max|                   1|
+-------+--------------------+



model 1 - LogisticRegression

In [None]:
from pyspark.ml.classification import LogisticRegression
classifier1 = LogisticRegression(maxIter = 100,regParam = 0.1,labelCol = 'isFraud',featuresCol='features')
model = classifier1.fit(train_data)
pred_data = model.transform(test_data)
pred_data.show(5)

+--------------------+-------+--------------------+--------------------+----------+
|            features|isFraud|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1],[1.0,181...|      0|[6.68042566919884...|[0.99874633012070...|       0.0|
|(8,[0,1],[1.0,305...|      0|[6.68038681354771...|[0.99874628146867...|       0.0|
|(8,[0,1],[1.0,346...|      0|[6.68037402317281...|[0.99874626545314...|       0.0|
|(8,[0,1],[1.0,456...|      0|[6.68033979827220...|[0.99874622259726...|       0.0|
|(8,[0,1],[1.0,625...|      0|[6.68028698738729...|[0.99874615646544...|       0.0|
+--------------------+-------+--------------------+--------------------+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='isFraud')
print(evaluator.evaluate(pred_data))

0.5254237288135594


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation

paramGrid = (ParamGridBuilder()
             .addGrid(classifier1.regParam, [0.1, 0.3, 0.5])
             .addGrid(classifier1.elasticNetParam, [0.0, 0.1, 0.2])
             .addGrid(model.maxIter, [10, 20, 50])
             .build())

In [None]:
cv= CrossValidator (estimator=classifier1,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=5)
cvModel = cv.fit(train_data)
predictions1 = cvModel.transform(test_data)
print(evaluator.evaluate(predictions1))

0.5254237288135594


model 2 - DecisionTreeClassifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
classifier2 = DecisionTreeClassifier(labelCol = 'isFraud', featuresCol='features')
model2 = classifier2.fit(train_data)
pred_data2 = model2.transform(test_data)
pred_data2.show(10,False)

+------------------------+-------+---------------+-----------------------------------------+----------+
|features                |isFraud|rawPrediction  |probability                              |prediction|
+------------------------+-------+---------------+-----------------------------------------+----------+
|(8,[0,1],[1.0,1810.41]) |0      |[417495.0,78.0]|[0.9998132063136266,1.867936863734006E-4]|0.0       |
|(8,[0,1],[1.0,3056.61]) |0      |[417495.0,78.0]|[0.9998132063136266,1.867936863734006E-4]|0.0       |
|(8,[0,1],[1.0,3466.83]) |0      |[417495.0,78.0]|[0.9998132063136266,1.867936863734006E-4]|0.0       |
|(8,[0,1],[1.0,4564.51]) |0      |[417495.0,78.0]|[0.9998132063136266,1.867936863734006E-4]|0.0       |
|(8,[0,1],[1.0,6258.29]) |0      |[417495.0,78.0]|[0.9998132063136266,1.867936863734006E-4]|0.0       |
|(8,[0,1],[1.0,7550.29]) |0      |[417495.0,78.0]|[0.9998132063136266,1.867936863734006E-4]|0.0       |
|(8,[0,1],[1.0,9496.55]) |0      |[417495.0,78.0]|[0.99981320631

In [None]:
print(evaluator.evaluate(pred_data2))

0.752108147308671


In [None]:
cv= CrossValidator (estimator=classifier2,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=5)
cvModel = cv.fit(train_data)
predictions2 = cvModel.transform(test_data)
print(evaluator.evaluate(predictions2))

0.7923099255823631


model 3 - RandomForestClassifier

In [None]:
classifier3 = RandomForestClassifier (numTrees=10, labelCol = 'isFraud',featuresCol='features')
model3=classifier3.fit(train_data)
pred_data3 = model.transform(test_data)
pred_data3.show(10,False)

+------------------------+-------+----------------------------------------+------------------------------------------+----------+
|features                |isFraud|rawPrediction                           |probability                               |prediction|
+------------------------+-------+----------------------------------------+------------------------------------------+----------+
|(8,[0,1],[1.0,1810.41]) |0      |[6.680425669198847,-6.680425669198847]  |[0.9987463301207051,0.001253669879294872] |0.0       |
|(8,[0,1],[1.0,3056.61]) |0      |[6.680386813547716,-6.680386813547716]  |[0.9987462814686717,0.001253718531328274] |0.0       |
|(8,[0,1],[1.0,3466.83]) |0      |[6.68037402317281,-6.68037402317281]    |[0.9987462654531436,0.0012537345468563998]|0.0       |
|(8,[0,1],[1.0,4564.51]) |0      |[6.680339798272206,-6.680339798272206]  |[0.9987462225972682,0.0012537774027318038]|0.0       |
|(8,[0,1],[1.0,6258.29]) |0      |[6.680286987387297,-6.680286987387297]  |[0.998746156465

In [None]:
print(evaluator.evaluate(pred_data3))

0.5254237288135594


In [None]:
cv= CrossValidator (estimator=classifier3,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=5)
cvModel = cv.fit(train_data)
predictions3 = cvModel.transform(test_data)
print(evaluator.evaluate(predictions3))

Exception ignored in: <function JavaWrapper.__del__ at 0x7dbaed496290>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


0.7415149310998854


INFERENCE :

Among the models
            

*   LogisticRegression()
*   DecisionTreeClassifier()
*   RandomForestClassifier()

DecisionTreeClassifier() has the highest evaluation score of 0.79



CONCLUSION:

The data is extremaly unbalanced, however it was possible to make all the data analysis and create with good scores.

The company may expect a revenue of R$ 57,251,574.44. This result may show the capacity of a project of data science and help the company.