## Fraud Detection on PaySim Dataset



### Dataset:

PaySim is a public synthetically generated dataset that uses aggregated data from the private dataset that resembles the normal operation of transactions and injects malicious behaviour to later evaluate the performance of fraud detection methods.

### Features Used:

- **type** - CASH-IN, CASH-OUT, DEBIT, PAYMENT and TRANSFER.
- **amount** - amount of the transaction in local currency.
- **nameOrig** - customer who started the transaction
- **oldbalanceOrg** - initial balance before the transaction
- **newbalanceOrig** - new balance after the transaction
- **nameDest** - customer who is the recipient of the transaction
- **oldbalanceDest** - initial balance recipient before the transaction. Note that there is not information for customers that start with M (Merchants).
- **newbalanceDest** - new balance recipient after the transaction. Note that there is not information for customers that start with M (Merchants).
- **isFraud** - This is the transactions made by the fraudulent agents inside the simulation. In this specific dataset the fraudulent behavior of the agents aims to profit by taking control or customers accounts and try to empty the funds by transferring to another account and then cashing out of the system.



In [79]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [80]:
!pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [81]:
! mkdir ~/.kaggle #create kaggle directory
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json

mkdir: cannot create directory ‘/root/.kaggle’: File exists


In [82]:
!kaggle datasets download -d ealaxi/paysim1

paysim1.zip: Skipping, found more recently modified local copy (use --force to force download)


In [83]:
from zipfile import ZipFile

dataset='/content/paysim1.zip'
with ZipFile(dataset,'r') as zi:
  zi.extractall()
  print("Dataset is extracted")

Dataset is extracted


In [84]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [85]:
spark=SparkSession.builder.getOrCreate()

In [86]:
spark

In [87]:
import os
from os.path import isfile, join

data_loc="/content/data.csv"

In [88]:
print(data_loc)

/content/data.csv


In [89]:
df=spark.read.option('header','true').csv(data_loc)

In [90]:
df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)



In [91]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [92]:
df=df.select("type","amount","oldbalanceOrg","newbalanceOrig","isFraud")

In [93]:
df.show(5)

+--------+--------+-------------+--------------+-------+
|    type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+--------+-------------+--------------+-------+
| PAYMENT| 9839.64|     170136.0|     160296.36|      0|
| PAYMENT| 1864.28|      21249.0|      19384.72|      0|
|TRANSFER|   181.0|        181.0|           0.0|      1|
|CASH_OUT|   181.0|        181.0|           0.0|      1|
| PAYMENT|11668.14|      41554.0|      29885.86|      0|
+--------+--------+-------------+--------------+-------+
only showing top 5 rows



Note: 'type' is a categorical column, requires preprocessing

Columns - amount, oldbalanceOrg and newbalanceOrig are double type and isFraud is boolean, use .cast() to typecast the datatypes from string to double and bool

In [94]:
df.dtypes

[('type', 'string'),
 ('amount', 'string'),
 ('oldbalanceOrg', 'string'),
 ('newbalanceOrig', 'string'),
 ('isFraud', 'string')]

In [95]:
from pyspark.sql.functions import col

In [96]:
df = df.withColumn("amount_double", col("amount").cast("double")).drop("amount")

In [97]:
df.dtypes

[('type', 'string'),
 ('oldbalanceOrg', 'string'),
 ('newbalanceOrig', 'string'),
 ('isFraud', 'string'),
 ('amount_double', 'double')]

In [98]:
df = df.withColumn("oldbalanceOrg", col("oldbalanceOrg").cast("double"))\
      .withColumn("newbalanceOrig",col("newbalanceOrig").cast("double"))\
      .withColumn("isFraud",col("isFraud").cast("int"))

In [99]:
df.dtypes

[('type', 'string'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int'),
 ('amount_double', 'double')]

In [100]:
df = df.withColumnRenamed("amount_double", "amount")

In [101]:
df.show(5)

+--------+-------------+--------------+-------+--------+
|    type|oldbalanceOrg|newbalanceOrig|isFraud|  amount|
+--------+-------------+--------------+-------+--------+
| PAYMENT|     170136.0|     160296.36|      0| 9839.64|
| PAYMENT|      21249.0|      19384.72|      0| 1864.28|
|TRANSFER|        181.0|           0.0|      1|   181.0|
|CASH_OUT|        181.0|           0.0|      1|   181.0|
| PAYMENT|      41554.0|      29885.86|      0|11668.14|
+--------+-------------+--------------+-------+--------+
only showing top 5 rows



## Train Test Split

In [102]:
#Train Test Split - Random Split
train,test=df.randomSplit([0.7,0.3],seed=7)

In [103]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4451490 records
Test set length: 1911130 records


In [104]:
train.show(5)

+-------+-------------+--------------+-------+------+
|   type|oldbalanceOrg|newbalanceOrig|isFraud|amount|
+-------+-------------+--------------+-------+------+
|CASH_IN|          0.0|          5.44|      0|  5.44|
|CASH_IN|          0.0|         16.89|      0| 16.89|
|CASH_IN|          0.0|         53.63|      0| 53.63|
|CASH_IN|          0.0|         63.56|      0| 63.56|
|CASH_IN|          0.0|        168.96|      0|168.96|
+-------+-------------+--------------+-------+------+
only showing top 5 rows



In [105]:
catCols=[x for (x,dataType) in train.dtypes if dataType=="string"]
numCols=[x for (x,dataType) in train.dtypes if ((dataType=="double") & (x!="isFraud"))]

In [106]:
print(numCols)
print(catCols)

['oldbalanceOrg', 'newbalanceOrig', 'amount']
['type']


## One Hot Encoding

In [107]:
#stringIndexer - Converts a single feature to an index feature

train.agg(F.countDistinct("type")).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



In [108]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 373084|
| CASH_IN| 979536|
|CASH_OUT|1566112|
| PAYMENT|1503731|
|   DEBIT|  29027|
+--------+-------+



In [109]:
from pyspark.ml.feature import (OneHotEncoder,StringIndexer)

In [110]:
string_indexer=[
    StringIndexer(inputCol=x,outputCol=x+"_StringIndexer",handleInvalid="skip")
    for x in catCols
]

In [111]:
string_indexer

[StringIndexer_a4444101dbbe]

In [112]:
one_hot_encoder=[
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [113]:
one_hot_encoder

[OneHotEncoder_0cb0c5dc9f42]

## Vector Assembler 

In [114]:
#Combines values of i/p columns into a single Vector
from pyspark.ml.feature import VectorAssembler

In [115]:
assemblerInput=[x for x in numCols]
assemblerInput+=[f"{x}_OneHotEncoder" for x in catCols]

In [116]:
assemblerInput

['oldbalanceOrg', 'newbalanceOrig', 'amount', 'type_OneHotEncoder']

In [117]:
vector_assembler=VectorAssembler(
    inputCols=assemblerInput,outputCol="VectorAssembler_features"
)

## PipeLine

In [118]:
#create a pipeline for preprocessing stages
stages=[]
stages+=string_indexer
stages+=one_hot_encoder
stages+=[vector_assembler]

In [119]:
stages

[StringIndexer_a4444101dbbe,
 OneHotEncoder_0cb0c5dc9f42,
 VectorAssembler_25bcb018d9b5]

In [120]:
%%time
from pyspark.ml import Pipeline

pipeline=Pipeline().setStages(stages)
model=pipeline.fit(train)

pp_df=model.transform(test)

CPU times: user 273 ms, sys: 47.9 ms, total: 321 ms
Wall time: 41.3 s


In [121]:
pp_df.select(
    "type","amount","oldbalanceOrg","newbalanceOrig","VectorAssembler_features",   
).show(truncate=False)

+-------+-------+-------------+--------------+---------------------------------+
|type   |amount |oldbalanceOrg|newbalanceOrig|VectorAssembler_features         |
+-------+-------+-------------+--------------+---------------------------------+
|CASH_IN|22.31  |0.0          |22.31         |(7,[1,2,5],[22.31,22.31,1.0])    |
|CASH_IN|130.24 |0.0          |130.24        |(7,[1,2,5],[130.24,130.24,1.0])  |
|CASH_IN|367.6  |0.0          |367.6         |(7,[1,2,5],[367.6,367.6,1.0])    |
|CASH_IN|430.61 |0.0          |430.61        |(7,[1,2,5],[430.61,430.61,1.0])  |
|CASH_IN|500.71 |0.0          |500.71        |(7,[1,2,5],[500.71,500.71,1.0])  |
|CASH_IN|1020.78|0.0          |1020.78       |(7,[1,2,5],[1020.78,1020.78,1.0])|
|CASH_IN|1117.04|0.0          |1117.04       |(7,[1,2,5],[1117.04,1117.04,1.0])|
|CASH_IN|1187.82|0.0          |1187.82       |(7,[1,2,5],[1187.82,1187.82,1.0])|
|CASH_IN|1279.33|0.0          |1279.33       |(7,[1,2,5],[1279.33,1279.33,1.0])|
|CASH_IN|1561.31|0.0        

## Logistic Regression

In [124]:
from pyspark.ml.classification import LogisticRegression

In [125]:
data=pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [126]:
data.show(5,truncate=False)

+-------------------------------+-----+
|features                       |label|
+-------------------------------+-----+
|(7,[1,2,5],[22.31,22.31,1.0])  |0    |
|(7,[1,2,5],[130.24,130.24,1.0])|0    |
|(7,[1,2,5],[367.6,367.6,1.0])  |0    |
|(7,[1,2,5],[430.61,430.61,1.0])|0    |
|(7,[1,2,5],[500.71,500.71,1.0])|0    |
+-------------------------------+-----+
only showing top 5 rows



In [127]:
%%time
model=LogisticRegression().fit(data)

CPU times: user 611 ms, sys: 96 ms, total: 707 ms
Wall time: 2min 3s


In [128]:
model.summary.areaUnderROC

0.9924047180812329

In [129]:
model.summary.pr.show()

+------------------+-------------------+
|            recall|          precision|
+------------------+-------------------+
|               0.0|   0.91701244813278|
|0.3695652173913043|   0.91701244813278|
|0.4887123745819398| 0.6660968660968661|
|0.5468227424749164| 0.5137470542026709|
|0.5915551839464883|0.42403356308061135|
|             0.625|0.36216085271317827|
|0.6609531772575251|0.32140678999796707|
|0.6860367892976589| 0.2873905429071804|
|0.7077759197324415| 0.2604214736194432|
|0.7290969899665551|0.23916620954470652|
|0.7483277591973244|0.22145243102808362|
|0.7663043478260869|0.20655848546315078|
|0.7792642140468228|0.19286083807553026|
| 0.794314381270903|0.18171384850803365|
|0.8051839464882943| 0.1712456655108029|
|0.8068561872909699|0.16032563548762252|
|0.8093645484949833|0.15090809883856887|
|0.8122909698996655|0.14265785609397943|
|0.8152173913043478|0.13531330233849143|
| 0.818561872909699| 0.1287988422575977|
+------------------+-------------------+
only showing top