In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.appName("Fraud Detection").getOrCreate()

In [2]:
spark

In [3]:
df = spark.read.csv("file:///home/nitin/Datasets/fraud_detection.csv", inferSchema=True, header=True)

In [4]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [5]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



In [6]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

In [7]:
df.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



### Train/test split

In [8]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

In [9]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4451877 records
Test set length: 1910743 records


In [10]:
train.show(2)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows



In [11]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [12]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud"))
]

In [13]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


In [14]:
train.agg(F.countDistinct("type")).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



In [15]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 372791|
| CASH_IN| 979384|
|CASH_OUT|1565928|
| PAYMENT|1504894|
|   DEBIT|  28880|
+--------+-------+



In [16]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [17]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [18]:
string_indexer

[StringIndexer_2191760b0bda]

In [19]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [20]:
one_hot_encoder

[OneHotEncoder_cbfec0e688a1]

### Vector assembling




In [21]:
from pyspark.ml.feature import VectorAssembler

In [22]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [23]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [24]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

In [25]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]


In [26]:
stages

[StringIndexer_2191760b0bda,
 OneHotEncoder_cbfec0e688a1,
 VectorAssembler_6a7a5d2404d1]

In [27]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

CPU times: user 7.7 ms, sys: 30.2 ms, total: 37.9 ms
Wall time: 12.2 s


In [28]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features",
).show(truncate=False)

+-------+------+-------------+--------------+---------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                           |
+-------+------+-------------+--------------+---------------------------------------------------+
|CASH_IN|4.58  |94241.0      |94245.58      |[4.58,94241.0,94245.58,0.0,0.0,1.0,0.0]            |
|CASH_IN|5.44  |0.0          |5.44          |(7,[0,2,5],[5.44,5.44,1.0])                        |
|CASH_IN|6.07  |400680.0     |400686.07     |[6.07,400680.0,400686.07,0.0,0.0,1.0,0.0]          |
|CASH_IN|6.76  |11322.0      |11328.76      |[6.76,11322.0,11328.76,0.0,0.0,1.0,0.0]            |
|CASH_IN|8.27  |8428410.94   |8428419.21    |[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]       |
|CASH_IN|8.44  |39384.0      |39392.44      |[8.44,39384.0,39392.44,0.0,0.0,1.0,0.0]            |
|CASH_IN|11.13 |4116439.74   |4116450.87    |[11.13,4116439.74,4116450.87,0.0,0.0,1.0,0.0]      |
|CASH_IN|12.79 |6017

### Logistic Regression

In [29]:
from pyspark.ml.classification import LogisticRegression

In [30]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [31]:
data.show(5, truncate=False)

+--------------------------------------------+-----+
|features                                    |label|
+--------------------------------------------+-----+
|[4.58,94241.0,94245.58,0.0,0.0,1.0,0.0]     |0    |
|(7,[0,2,5],[5.44,5.44,1.0])                 |0    |
|[6.07,400680.0,400686.07,0.0,0.0,1.0,0.0]   |0    |
|[6.76,11322.0,11328.76,0.0,0.0,1.0,0.0]     |0    |
|[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]|0    |
+--------------------------------------------+-----+
only showing top 5 rows



In [32]:
%%time
model = LogisticRegression().fit(data)

CPU times: user 1.44 ms, sys: 27.1 ms, total: 28.6 ms
Wall time: 39.9 s


In [33]:
model.summary.areaUnderROC

0.9928715755672283

In [34]:
model.summary.pr.show()

+------------------+-------------------+
|            recall|          precision|
+------------------+-------------------+
|               0.0| 0.9127789046653144|
|0.3598560575769692| 0.9127789046653144|
|0.4838064774090364| 0.6813063063063063|
|0.5501799280287885| 0.5362431800467654|
| 0.594562175129948| 0.4430870083432658|
|0.6309476209516194|0.38060781476121563|
|0.6613354658136745|0.33508914100486226|
|0.6889244302279088| 0.3009081383164513|
|0.7113154738104758|0.27302025782688766|
|0.7305077968812475|0.25006843690117714|
|0.7477009196321471|0.23097826086956522|
|0.7616953218712516|0.21438217420661715|
|0.7752898840463814| 0.2003927242662257|
|0.7860855657736905|0.18784635964074145|
|0.7968812475009996|0.17706112295664533|
|0.8000799680127949|0.16611323260833472|
|0.8036785285885646|0.15659083826737302|
|0.8096761295481807|0.14861294583883752|
|0.8128748500599761|0.14102386237513873|
|0.8164734106357457| 0.1342890964093121|
+------------------+-------------------+
only showing top