# Fraud Detection Using PySpark

In [27]:
import pyspark
from pyspark.sql.functions import when, col, explode, array, lit
import pyspark.sql.functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('FraudDetection').getOrCreate()

In [28]:
sparkframe = spark.read.csv(r'C:\Users\Jiaming\csv files\fraud_detection.csv',inferSchema = True, header = True)

In [29]:
sparkframe.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [30]:
sparkframe.groupBy('isFraud').count().show()

+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   8213|
|      0|6354407|
+-------+-------+



Unbalanced data in dependent variable.

In [31]:
sparkframe.crosstab("type","isFraud").show()

+------------+-------+----+
|type_isFraud|      0|   1|
+------------+-------+----+
|    TRANSFER| 528812|4097|
|       DEBIT|  41432|   0|
|     CASH_IN|1399284|   0|
|    CASH_OUT|2233384|4116|
|     PAYMENT|2151495|   0|
+------------+-------+----+



Fraudulent activity is coming from cash out and transfers.

In [32]:
sparkframe.limit(50).toPandas()

Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
0,1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0
1,1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0
2,1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0
3,1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
4,1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0
5,1,PAYMENT,7817.71,C90045638,53860.0,46042.29,M573487274,0.0,0.0,0,0
6,1,PAYMENT,7107.77,C154988899,183195.0,176087.23,M408069119,0.0,0.0,0,0
7,1,PAYMENT,7861.64,C1912850431,176087.23,168225.59,M633326333,0.0,0.0,0,0
8,1,PAYMENT,4024.36,C1265012928,2671.0,0.0,M1176932104,0.0,0.0,0,0
9,1,DEBIT,5337.77,C712410124,41720.0,36382.23,C195600860,41898.0,40348.79,0,0


In [33]:
sparkframe = sparkframe.withColumn('fraud_or_not', F.when((F.col("type") == 'CASH_OUT') & (F.col("isFraud") == 1),"FRAUD_CASH_OUT")\
                                  .when((F.col("type") == "TRANSFER")&(F.col("isFraud") == 1),"FRAUD_TRANSFER")\
                                 .otherwise("NO_FRAUD_ALERT"))

        

In [34]:
sparkframe.groupBy('fraud_or_not').count().show()

+--------------+-------+
|  fraud_or_not|  count|
+--------------+-------+
|FRAUD_CASH_OUT|   4116|
|FRAUD_TRANSFER|   4097|
|NO_FRAUD_ALERT|6354407|
+--------------+-------+



In [35]:
# Creating dummy variables.
categ = sparkframe.select('fraud_or_not').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [F.when(F.col('fraud_or_not') == cat,1).otherwise(0)\
            .alias(str(cat)) for cat in categ]
sparkframe = sparkframe.select(exprs+sparkframe.columns)

In [36]:
sparkframe.limit(4).toPandas()

Unnamed: 0,FRAUD_CASH_OUT,FRAUD_TRANSFER,NO_FRAUD_ALERT,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud,fraud_or_not
0,0,0,1,1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0,NO_FRAUD_ALERT
1,0,0,1,1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0,NO_FRAUD_ALERT
2,0,1,0,1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0,FRAUD_TRANSFER
3,1,0,0,1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0,FRAUD_CASH_OUT


In [37]:
# Dropping one of the dummy variables to prevent multicollinearity as well as other unnecessary columns.
drop_list = ['NO_FRAUD_ALERT','fraud_or_not','nameOrig','nameDest','isFlaggedFraud','type']

sparkframe = sparkframe.drop(*drop_list)


In [38]:
# Renaming indepedent variables , "isFraud", to "label" for machine learning purposes.
sparkframe = sparkframe.withColumnRenamed("isFraud", "label")

In [39]:
sparkframe.limit(4).toPandas()

Unnamed: 0,FRAUD_CASH_OUT,FRAUD_TRANSFER,step,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,label
0,0,0,1,9839.64,170136.0,160296.36,0.0,0.0,0
1,0,0,1,1864.28,21249.0,19384.72,0.0,0.0,0
2,0,1,1,181.0,181.0,0.0,0.0,0.0,1
3,1,0,1,181.0,181.0,0.0,21182.0,0.0,1


In [40]:
#Resampling dataset to balance out the unbalanced dependent variable.
major_df = sparkframe.filter(col("label") == 0)
minor_df = sparkframe.filter(col("label") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

ratio: 773


In [41]:
a = range(ratio)
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')
combined_df = major_df.unionAll(oversampled_df)

In [42]:
combined_df.count()

12703056

# Machine Learning / Data Transformations

In [43]:
input_columns = combined_df.columns
input_columns

['FRAUD_CASH_OUT',
 'FRAUD_TRANSFER',
 'step',
 'amount',
 'oldbalanceOrg',
 'newbalanceOrig',
 'oldbalanceDest',
 'newbalanceDest',
 'label']

In [44]:
combined_df.printSchema()

root
 |-- FRAUD_CASH_OUT: integer (nullable = false)
 |-- FRAUD_TRANSFER: integer (nullable = false)
 |-- step: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- label: integer (nullable = true)



In [45]:
#Converting string columns into double type.
string_columns = input_columns[2:]

for column in string_columns:
    combined_df = combined_df.withColumn(column, combined_df[column].cast(DoubleType()))
        

In [46]:
combined_df.printSchema()

root
 |-- FRAUD_CASH_OUT: integer (nullable = false)
 |-- FRAUD_TRANSFER: integer (nullable = false)
 |-- step: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- label: double (nullable = true)



In [47]:

input_columns = input_columns[2:-1]
independent_var = "label"

input_columns

['step',
 'amount',
 'oldbalanceOrg',
 'newbalanceOrig',
 'oldbalanceDest',
 'newbalanceDest']

In [48]:
d = {}

for col in input_columns:
    d[col] = combined_df.approxQuantile(col,[0.01,0.99],0.25)
  
# checking skewness

for col in input_columns:
    skew = combined_df.agg(skewness(combined_df[col])).collect()
    skew = skew[0][0]
    
    if skew > 1:
        combined_df = combined_df.withColumn(col,\
                                            log(when(combined_df[col] < d[col][0], d[col][0])\
                                               .when(combined_df[col] > d[col][1], d[col][1])\
                                               .otherwise(combined_df[col])+ 1).alias(col))
        print(col, " has been treated for pos skew", skew)
    
    elif skew < -1:
        combined_df = combined_df.withColumn(col,\
                                            exp(when(combined_df[col] < d[col][0], d[col][0])\
                                               .when(combined_df[col] > d[col][1], d[col][1])\
                                               .otherwise(combined_df[col])).alias(col))
       

amount  has been treated for pos skew 3.8764019034596875
oldbalanceOrg  has been treated for pos skew 5.774444220401397
newbalanceOrig  has been treated for pos skew 7.62142143703115
oldbalanceDest  has been treated for pos skew 32.178582900253645
newbalanceDest  has been treated for pos skew 24.637252758925943


In [49]:
features_list = combined_df.columns[:-1]
assembler = VectorAssembler(inputCols = features_list, outputCol = 'features')
final_data = assembler.transform(combined_df).select('features','label')
final_data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4],[1.0,8...|  0.0|
|[0.0,0.0,1.0,8.58...|  0.0|
|[0.0,0.0,1.0,9.17...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4],[1.0,9...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|[0.0,0.0,1.0,12.3...|  0.0|
|(8,[2,3,4],[1.0,7...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|(8,[2,3,4,6],[1.0...|  0.0|
|(8,[2,3,4,5],[1.0...|  0.0|
|[0.0,0.0,1.0,9.13...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [50]:
train, test = final_data.randomSplit([0.8,0.2])

In [51]:
# Machine learning portion.

Bin_evaluator = BinaryClassificationEvaluator()


In [52]:
classifier = LogisticRegression(featuresCol = 'features', labelCol = 'label')
fitModel = classifier.fit(train)

predictionAndLabels = fitModel.transform(test)
auc = Bin_evaluator.evaluate(predictionAndLabels)

print("AUC: " ,auc)

AUC:  0.9999805496003189


Model performed well with 99.99% AUC score.