In [188]:
!pip install pyspark



In [189]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import format_number
from pyspark.ml.feature import OneHotEncoder,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import PCA
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [190]:
df = spark.read.csv(r"/content/PS_20174392719_1491204439457_log.csv", inferSchema=True, header=True)

In [191]:
df.dtypes

[('step', 'int'),
 ('type', 'string'),
 ('amount', 'double'),
 ('nameOrig', 'string'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('nameDest', 'string'),
 ('oldbalanceDest', 'double'),
 ('newbalanceDest', 'double'),
 ('isFraud', 'int'),
 ('isFlaggedFraud', 'int')]

In [192]:
df.show(5)


+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [193]:
# To check which type of payments happens the most
df.groupBy("type").agg(F.count("type")).show()

+--------+-----------+
|    type|count(type)|
+--------+-----------+
|TRANSFER|     532909|
| CASH_IN|    1399284|
|CASH_OUT|    2237500|
| PAYMENT|    2151495|
|   DEBIT|      41432|
+--------+-----------+



In [194]:
#count of the transactions which are fraud and not fraud
df.groupBy("isFraud").agg(F.count("isFraud")).show()

+-------+--------------+
|isFraud|count(isFraud)|
+-------+--------------+
|      1|          8213|
|      0|       6354407|
+-------+--------------+



In [195]:
# count of the records flagged as fraud
df.filter(F.col("isFraud")=="1").count()


8213

In [196]:
#count of the records which are flagged as not fraud but are originally fraud
df.filter((F.col("isFlaggedFraud")=="0")&(F.col("isFraud")=="1")).count()

8197

In [197]:
#count of the records which are flagged as fraud and also originally fraud
df.filter((F.col("isFlaggedFraud")=="1")&(F.col("isFraud")=="1")).count()


16

In [198]:
#count of the records which are flagged as fraud and they are originally not fraud
df.filter((F.col("isFlaggedFraud")=="1")&(F.col("isFraud")=="0")).count()

0

In [199]:
# system showing frauds
df.filter(F.col("isFlaggedFraud")=="1").groupBy(F.col("type")).count().show()

+--------+-----+
|    type|count|
+--------+-----+
|TRANSFER|   16|
+--------+-----+



In [200]:
# actual frauds
df.filter(F.col("isFraud")=="1").groupBy(F.col("type")).count().show()

+--------+-----+
|    type|count|
+--------+-----+
|TRANSFER| 4097|
|CASH_OUT| 4116|
+--------+-----+



In [201]:
# Here we are correcting the column isFlaggedFraud since it is returning invalid results for the records which are actual fraud
df1 = df.withColumn("isFlaggedFraudNew", F.when((F.col("IsFraud")=="1"), "1").otherwise("0"))
df1.show(10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|isFlaggedFraudNew|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|                0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|                0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|                1|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0

In [202]:
#count of the records which are flagged as fraud and also originally fraud
df1.filter((F.col("isFlaggedFraudNew")=="1")&(F.col("isFraud")=="1")).count()

8213

In [203]:
#we retrieved the ids of customers and the type which are fraud
df1=df1.select("amount","type","nameOrig","isFraud","isFlaggedFraudNew").where(F.col("isfraud")=="1")
df1.show(5)

+-------+--------+-----------+-------+-----------------+
| amount|    type|   nameOrig|isFraud|isFlaggedFraudNew|
+-------+--------+-----------+-------+-----------------+
|  181.0|TRANSFER|C1305486145|      1|                1|
|  181.0|CASH_OUT| C840083671|      1|                1|
| 2806.0|TRANSFER|C1420196421|      1|                1|
| 2806.0|CASH_OUT|C2101527076|      1|                1|
|20128.0|TRANSFER| C137533655|      1|                1|
+-------+--------+-----------+-------+-----------------+
only showing top 5 rows



In [204]:
# since we corrected the logic of the flagged column, we can only select the necessary columns
df2 = df1.select('type','amount','isFraud','isFlaggedFraudNew')
df2.show(10)



+--------+----------+-------+-----------------+
|    type|    amount|isFraud|isFlaggedFraudNew|
+--------+----------+-------+-----------------+
|TRANSFER|     181.0|      1|                1|
|CASH_OUT|     181.0|      1|                1|
|TRANSFER|    2806.0|      1|                1|
|CASH_OUT|    2806.0|      1|                1|
|TRANSFER|   20128.0|      1|                1|
|CASH_OUT|   20128.0|      1|                1|
|CASH_OUT| 416001.33|      1|                1|
|TRANSFER|1277212.77|      1|                1|
|CASH_OUT|1277212.77|      1|                1|
|TRANSFER|  35063.63|      1|                1|
+--------+----------+-------+-----------------+
only showing top 10 rows



In [205]:
# since we corrected the logic of the flagged column, we can only select the necessary columns
df2 = df1.select("type","amount","isFraud","isFlaggedFraudNew")
df2.show(10)

+--------+----------+-------+-----------------+
|    type|    amount|isFraud|isFlaggedFraudNew|
+--------+----------+-------+-----------------+
|TRANSFER|     181.0|      1|                1|
|CASH_OUT|     181.0|      1|                1|
|TRANSFER|    2806.0|      1|                1|
|CASH_OUT|    2806.0|      1|                1|
|TRANSFER|   20128.0|      1|                1|
|CASH_OUT|   20128.0|      1|                1|
|CASH_OUT| 416001.33|      1|                1|
|TRANSFER|1277212.77|      1|                1|
|CASH_OUT|1277212.77|      1|                1|
|TRANSFER|  35063.63|      1|                1|
+--------+----------+-------+-----------------+
only showing top 10 rows



In [206]:
#Getting the max amount of fraud happened
df2.filter(F.col("isFraud") == "1").agg(F.max("amount")).show()

+-----------+
|max(amount)|
+-----------+
|      1.0E7|
+-----------+



In [207]:
# The most fraud transactions were occured when the amount is 1.0E7 are in Transfer and Cash Out types coz one cannot transfer or wihtdraw such a huge amount
df2.filter(F.col("amount")=="1.0E7").show(5)

+--------+------+-------+-----------------+
|    type|amount|isFraud|isFlaggedFraudNew|
+--------+------+-------+-----------------+
|TRANSFER| 1.0E7|      1|                1|
|CASH_OUT| 1.0E7|      1|                1|
|TRANSFER| 1.0E7|      1|                1|
|CASH_OUT| 1.0E7|      1|                1|
|TRANSFER| 1.0E7|      1|                1|
+--------+------+-------+-----------------+
only showing top 5 rows



In [208]:
result = df2.filter(F.col("isFraud") == "1").agg(F.max("amount").cast("double").alias("max_amount"))
result.select(format_number("max_amount", 0).alias("max_amount")).show()


+----------+
|max_amount|
+----------+
|10,000,000|
+----------+



In [209]:
df = df.select("amount","type","oldbalanceOrg","newbalanceOrig","isFraud")

In [210]:
train, test = df.randomSplit([0.7, 0.3], seed=5)

In [211]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4453538 records
Test set length: 1909082 records


In [212]:
train.show(5,False)

+------+-------+-------------+--------------+-------+
|amount|type   |oldbalanceOrg|newbalanceOrig|isFraud|
+------+-------+-------------+--------------+-------+
|0.06  |PAYMENT|0.0          |0.0           |0      |
|0.1   |PAYMENT|0.0          |0.0           |0      |
|0.2   |PAYMENT|573881.0     |573880.8      |0      |
|0.21  |PAYMENT|0.0          |0.0           |0      |
|0.24  |PAYMENT|14235.0      |14234.76      |0      |
+------+-------+-------------+--------------+-------+
only showing top 5 rows



In [213]:
categoricalcols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numericalcols = [ x for (x, dataType) in train.dtypes if (dataType == "double")]

In [214]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 373178|
| CASH_IN| 979783|
|CASH_OUT|1566207|
| PAYMENT|1505319|
|   DEBIT|  29051|
+--------+-------+



In [215]:
indexes = [StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip") for x in categoricalcols]

In [216]:
indexfit=indexes[0].fit(df).transform(df)
indexfit.show()

+---------+--------+-------------+--------------+-------+------------------+
|   amount|    type|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|
+---------+--------+-------------+--------------+-------+------------------+
|  9839.64| PAYMENT|     170136.0|     160296.36|      0|               1.0|
|  1864.28| PAYMENT|      21249.0|      19384.72|      0|               1.0|
|    181.0|TRANSFER|        181.0|           0.0|      1|               3.0|
|    181.0|CASH_OUT|        181.0|           0.0|      1|               0.0|
| 11668.14| PAYMENT|      41554.0|      29885.86|      0|               1.0|
|  7817.71| PAYMENT|      53860.0|      46042.29|      0|               1.0|
|  7107.77| PAYMENT|     183195.0|     176087.23|      0|               1.0|
|  7861.64| PAYMENT|    176087.23|     168225.59|      0|               1.0|
|  4024.36| PAYMENT|       2671.0|           0.0|      0|               1.0|
|  5337.77|   DEBIT|      41720.0|      36382.23|      0|               4.0|

In [217]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in categoricalcols],
        outputCols=[f"{x}_OneHotEncoder" for x in categoricalcols],
    )
]

In [218]:
encoded=one_hot_encoder[0].fit(indexfit).transform(indexfit)
encoded.show()

+---------+--------+-------------+--------------+-------+------------------+------------------+
|   amount|    type|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|
+---------+--------+-------------+--------------+-------+------------------+------------------+
|  9839.64| PAYMENT|     170136.0|     160296.36|      0|               1.0|     (4,[1],[1.0])|
|  1864.28| PAYMENT|      21249.0|      19384.72|      0|               1.0|     (4,[1],[1.0])|
|    181.0|TRANSFER|        181.0|           0.0|      1|               3.0|     (4,[3],[1.0])|
|    181.0|CASH_OUT|        181.0|           0.0|      1|               0.0|     (4,[0],[1.0])|
| 11668.14| PAYMENT|      41554.0|      29885.86|      0|               1.0|     (4,[1],[1.0])|
|  7817.71| PAYMENT|      53860.0|      46042.29|      0|               1.0|     (4,[1],[1.0])|
|  7107.77| PAYMENT|     183195.0|     176087.23|      0|               1.0|     (4,[1],[1.0])|
|  7861.64| PAYMENT|    176087.23|     1

In [219]:
modelInput = [x for x in numericalcols]
modelInput += [f"{x}_OneHotEncoder" for x in categoricalcols]
modelInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [220]:
#Combines the input value columns into a single vector
vector_assembler = VectorAssembler(inputCols=modelInput, outputCol="VectorAssembler_output")

In [221]:
stages = []
stages += indexes
stages += one_hot_encoder
stages += [vector_assembler]
stages

[StringIndexer_b66d6348c0de,
 OneHotEncoder_2142db70cead,
 VectorAssembler_731928acf933]

In [222]:
# Here the above the 3 models are inserted into a pipeline so when we call this pipeline the 3 models are will run automatically
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pipeline_df = model.transform(train)

In [223]:
pipeline_df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_output").show(truncate=False)

+--------+------+-------------+--------------+--------------------------------------------+
|type    |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_output                      |
+--------+------+-------------+--------------+--------------------------------------------+
|PAYMENT |0.06  |0.0          |0.0           |(7,[0,4],[0.06,1.0])                        |
|PAYMENT |0.1   |0.0          |0.0           |(7,[0,4],[0.1,1.0])                         |
|PAYMENT |0.2   |573881.0     |573880.8      |[0.2,573881.0,573880.8,0.0,1.0,0.0,0.0]     |
|PAYMENT |0.21  |0.0          |0.0           |(7,[0,4],[0.21,1.0])                        |
|PAYMENT |0.24  |14235.0      |14234.76      |[0.24,14235.0,14234.76,0.0,1.0,0.0,0.0]     |
|PAYMENT |0.26  |242384.97    |242384.7      |[0.26,242384.97,242384.7,0.0,1.0,0.0,0.0]   |
|PAYMENT |0.32  |0.0          |0.0           |(7,[0,4],[0.32,1.0])                        |
|CASH_OUT|0.37  |25032.0      |25031.63      |[0.37,25032.0,25031.63,1.0,0.0,0.0

In [224]:
pipeline_df.show()

+------+--------+-------------+--------------+-------+------------------+------------------+----------------------+
|amount|    type|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|VectorAssembler_output|
+------+--------+-------------+--------------+-------+------------------+------------------+----------------------+
|  0.06| PAYMENT|          0.0|           0.0|      0|               1.0|     (4,[1],[1.0])|  (7,[0,4],[0.06,1.0])|
|   0.1| PAYMENT|          0.0|           0.0|      0|               1.0|     (4,[1],[1.0])|   (7,[0,4],[0.1,1.0])|
|   0.2| PAYMENT|     573881.0|      573880.8|      0|               1.0|     (4,[1],[1.0])|  [0.2,573881.0,573...|
|  0.21| PAYMENT|          0.0|           0.0|      0|               1.0|     (4,[1],[1.0])|  (7,[0,4],[0.21,1.0])|
|  0.24| PAYMENT|      14235.0|      14234.76|      0|               1.0|     (4,[1],[1.0])|  [0.24,14235.0,142...|
|  0.26| PAYMENT|    242384.97|      242384.7|      0|               1.0

In [225]:
df_test=test.where(test.isFraud == 1)

In [226]:
from pyspark.ml.classification import LogisticRegression

In [227]:
data = pipeline_df.select(F.col("VectorAssembler_output").alias("features"),F.col("isFraud").alias("label"))

In [228]:
data.show(5, truncate=False)

+---------------------------------------+-----+
|features                               |label|
+---------------------------------------+-----+
|(7,[0,4],[0.06,1.0])                   |0    |
|(7,[0,4],[0.1,1.0])                    |0    |
|[0.2,573881.0,573880.8,0.0,1.0,0.0,0.0]|0    |
|(7,[0,4],[0.21,1.0])                   |0    |
|[0.24,14235.0,14234.76,0.0,1.0,0.0,0.0]|0    |
+---------------------------------------+-----+
only showing top 5 rows



In [229]:
model = LogisticRegression().fit(data)
data=model.transform(data)
data.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(7,[0,4],[0.06,1.0])|    0|[168.455849824362...|           [1.0,0.0]|       0.0|
| (7,[0,4],[0.1,1.0])|    0|[168.455850634217...|           [1.0,0.0]|       0.0|
|[0.2,573881.0,573...|    0|[169.574438816169...|           [1.0,0.0]|       0.0|
|(7,[0,4],[0.21,1.0])|    0|[168.455852861320...|           [1.0,0.0]|       0.0|
|[0.24,14235.0,142...|    0|[168.483592902588...|           [1.0,0.0]|       0.0|
|[0.26,242384.97,2...|    0|[168.928295655524...|           [1.0,0.0]|       0.0|
|(7,[0,4],[0.32,1.0])|    0|[168.455855088424...|           [1.0,0.0]|       0.0|
|[0.37,25032.0,250...|    0|[5.92479631814167...|[0.99733477049186...|       0.0|
|[0.41,40610.0,406...|    0|[168.535000754647...|           [1.0,0.0]|       0.0|
|(7,[0,4],[0.42,

In [230]:
model = pipeline.fit(df_test)
pipeline_df_test = model.transform(df_test)

In [231]:
data_test = pipeline_df_test.select(F.col("VectorAssembler_output").alias("features"),F.col("isFraud").alias("label"))
data_test.show(5,False)

+---------------------+-----+
|features             |label|
+---------------------+-----+
|[119.0,119.0,0.0,1.0]|1    |
|[164.0,164.0,0.0,1.0]|1    |
|[170.0,170.0,0.0,1.0]|1    |
|[408.0,408.0,0.0,0.0]|1    |
|[636.0,636.0,0.0,1.0]|1    |
+---------------------+-----+
only showing top 5 rows



In [232]:
model = LogisticRegression().fit(data_test)
data=model.transform(data_test)
data.show()

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|[119.0,119.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[164.0,164.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[170.0,170.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[408.0,408.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[636.0,636.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[1055.0,1055.0,0....|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[1996.17,1996.17,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[2007.0,2007.0,0....|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[4530.71,4530.71,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[5624.02,5624.02,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[6339.42,6339.42,...|    1|[-Infinity,Infinity]|  

In [234]:
model.summary.areaUnderROC

1.0

In [235]:
model.summary.pr.show()

+------+---------+
|recall|precision|
+------+---------+
|   0.0|      1.0|
|   1.0|      1.0|
+------+---------+



In [236]:
# Reduced the features to 2 principal components
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model_pca = pca.fit(data_test)
transformed_data_pca = model_pca.transform(data_test)
transformed_data_pca.show(5, False)


+---------------------+-----+---------------------------------------+
|features             |label|pca_features                           |
+---------------------+-----+---------------------------------------+
|[119.0,119.0,0.0,1.0]|1    |[-151.83645873731683,72.57885174559738]|
|[164.0,164.0,0.0,1.0]|1    |[-209.2536069968608,100.02463602714336]|
|[170.0,170.0,0.0,1.0]|1    |[-216.9092267648,103.68407393134949]   |
|[408.0,408.0,0.0,0.0]|1    |[-520.5821442198653,248.84177748601692]|
|[636.0,636.0,0.0,1.0]|1    |[-811.4956954080774,387.90041782469234]|
+---------------------+-----+---------------------------------------+
only showing top 5 rows



In [237]:
model_lr_pca = LogisticRegression().fit(transformed_data_pca)
data_lr_pca = model_lr_pca.transform(transformed_data_pca)
data_lr_pca.show()


+--------------------+-----+--------------------+--------------------+-----------+----------+
|            features|label|        pca_features|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+--------------------+-----------+----------+
|[119.0,119.0,0.0,...|    1|[-151.83645873731...|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[164.0,164.0,0.0,...|    1|[-209.25360699686...|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[170.0,170.0,0.0,...|    1|[-216.9092267648,...|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[408.0,408.0,0.0,...|    1|[-520.58214421986...|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[636.0,636.0,0.0,...|    1|[-811.49569540807...|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[1055.0,1055.0,0....|    1|[-1346.1131425358...|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[1996.17,1996.17,...|    1|[-2546.9864186945...|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[2007.0,2007.0,0....|    1|[-2560.8048123756...|[-Infinity,

In [238]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
area_under_roc = evaluator.evaluate(data_lr_pca)
print("Area Under ROC for PCA-Transformed Data:", area_under_roc)


Area Under ROC for PCA-Transformed Data: 1.0
