# Fraud detection using a Random Forest model

Dr Jose M. Albornoz, May 2019

In this notebook I will build a classifier for fraud detection purposes using a Random Forest model. The dataset can be found at https://www.kaggle.com/ntnu-testimon/paysim1

In [1]:
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator  
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator  
import org.apache.spark.mllib.evaluation.MulticlassMetrics  
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics  
import org.apache.spark.ml.classification.RandomForestClassifier  
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit, CrossValidator}  
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, OneHotEncoderEstimator}  
import org.apache.spark.ml.linalg.Vectors  
import org.apache.spark.ml.Pipeline  
import org.apache.log4j._  
Logger.getLogger("org").setLevel(Level.ERROR) 

Intitializing Scala interpreter ...

Spark Web UI available at http://DESKTOP-FQ2BOOJ:4040
SparkContext available as 'sc' (version = 2.4.0, master = local[*], app id = local-1559745571045)
SparkSession available as 'spark'


import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit, CrossValidator}
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, OneHotEncoderEstimator}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.Pipeline
import org.apache.log4j._


# 1.- Load data

In [2]:
val data = spark.read.option("header","true").option("inferSchema","true").format("csv").load("Data/paysim.csv")

data: org.apache.spark.sql.DataFrame = [step: int, type: string ... 9 more fields]


In [3]:
data.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [4]:
val data_length = data.count

data_length: Long = 6362620


# 2.- Exploratory Data Analysis

## 2.1.- Class balance

In [5]:
val numCasesFraud = data.filter($"isFraud" === 1).count

numCasesFraud: Long = 8213


In [6]:
val numCasesNotFraud = data.filter($"isFraud" === 0).count

numCasesNotFraud: Long = 6354407


In [7]:
val numCasesFraudPercent = numCasesFraud.toFloat*100/data_length

numCasesFraudPercent: Float = 0.12908204


In [8]:
val numCasesNotFraudPercent = numCasesNotFraud.toFloat*100/data_length

numCasesNotFraudPercent: Float = 99.87092


## 2.2.- Missing values

In [9]:
val data_null_count = data.select(data.columns.map(colName => {count(when(col(colName).isNull, true)) as s"${colName}_nulls_count"}): _*)

data_null_count: org.apache.spark.sql.DataFrame = [step_nulls_count: bigint, type_nulls_count: bigint ... 9 more fields]


In [10]:
data_null_count.show

+----------------+----------------+------------------+--------------------+-------------------------+--------------------------+--------------------+--------------------------+--------------------------+-------------------+--------------------------+
|step_nulls_count|type_nulls_count|amount_nulls_count|nameOrig_nulls_count|oldbalanceOrg_nulls_count|newbalanceOrig_nulls_count|nameDest_nulls_count|oldbalanceDest_nulls_count|newbalanceDest_nulls_count|isFraud_nulls_count|isFlaggedFraud_nulls_count|
+----------------+----------------+------------------+--------------------+-------------------------+--------------------------+--------------------+--------------------------+--------------------------+-------------------+--------------------------+
|               0|               0|                 0|                   0|                        0|                         0|                   0|                         0|                         0|                  0|                        

There are no missing values in this dataset

# 3.- Feature Engineering

## 3.1.- Transactions that empty an account, zero initial balance in destination account, and balance differences

In [11]:
val data1 = data.withColumn("emptiedAccount", when($"amount" === $"oldbalanceOrg", lit(1)).otherwise(lit(0))).
                 withColumn("zeroBalance", when($"oldbalanceDest" === 0, lit(1)).otherwise(lit(0))).
                 withColumn("originBalanceDiff", $"newbalanceOrig" - $"oldbalanceOrg").
                 withColumn("recipientBalanceDiff", $"newbalanceDest" - $"oldbalanceDest")

data1: org.apache.spark.sql.DataFrame = [step: int, type: string ... 13 more fields]


# 4.- Train-test split

## 4.1.- Drop irrelevant columns, shuffle dataset, change label column name

In [12]:
val colsToRemove = Seq("step", "type", "nameOrig", "nameDest", "isFlaggedFraud") 

colsToRemove: Seq[String] = List(step, type, nameOrig, nameDest, isFlaggedFraud)


In [13]:
import org.apache.spark.sql.functions.rand
val data2 = data1.drop(colsToRemove:_*).orderBy(rand()).orderBy(rand()).withColumnRenamed("isFraud", "label")

import org.apache.spark.sql.functions.rand
data2: org.apache.spark.sql.DataFrame = [amount: double, oldbalanceOrg: double ... 8 more fields]


In [14]:
data2.printSchema

root
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- label: integer (nullable = true)
 |-- emptiedAccount: integer (nullable = false)
 |-- zeroBalance: integer (nullable = false)
 |-- originBalanceDiff: double (nullable = true)
 |-- recipientBalanceDiff: double (nullable = true)



## 4.2.- Assemble features vector

In [15]:
// Set the input columns as the features we want to use
val assembler = (new VectorAssembler().setInputCols(Array("amount", "oldbalanceOrg", 
    "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "emptiedAccount", "zeroBalance", 
    "originBalanceDiff", "recipientBalanceDiff")).
   setOutputCol("features"))

// Transform the DataFrame
val output = assembler.transform(data2).select($"label",$"features")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_204cfe7000e4
output: org.apache.spark.sql.DataFrame = [label: int, features: vector]


In [16]:
// Splitting the data by create an array of the training and test data
val Array(training, test) = output.select("label","features").randomSplit(Array(0.7, 0.3), seed = 801)

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, features: vector]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, features: vector]


# 5.- Grid Search

I will now create a model object (I’m using a Random Forest Classifier), define a parameter grid, create a Cross Validator object (here is where we set our scoring metric for training the model) and fit the model.

In [17]:
// create the model
val rf = new RandomForestClassifier()

rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_50246d962f4e


In [18]:
// create the param grid
val paramGrid = new ParamGridBuilder().addGrid(rf.maxBins, Array(1000)).
                                       addGrid(rf.minInstancesPerNode, Array(30)).
                                       addGrid(rf.numTrees, Array(30)).
                                       addGrid(rf.maxDepth, Array(5)).build()

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_50246d962f4e-maxBins: 1000,
	rfc_50246d962f4e-maxDepth: 5,
	rfc_50246d962f4e-minInstancesPerNode: 30,
	rfc_50246d962f4e-numTrees: 30
})


In [19]:
// create cross val object, define scoring metric
val cv = new CrossValidator().
  setEstimator(rf).
  setEvaluator(new BinaryClassificationEvaluator().setMetricName("areaUnderPR")).
  setEstimatorParamMaps(paramGrid).
  setNumFolds(10).
setParallelism(2)

cv: org.apache.spark.ml.tuning.CrossValidator = cv_35592471fe00


## 5.1.- Model training

In [20]:
// You can then treat this object as the model and use fit on it.
val model = cv.fit(training)

model: org.apache.spark.ml.tuning.CrossValidatorModel = cv_35592471fe00


In [21]:
model.avgMetrics

res4: Array[Double] = Array(0.9965534672046898)


In [22]:
model.bestModel 

res5: org.apache.spark.ml.Model[_] = RandomForestClassificationModel (uid=rfc_50246d962f4e) with 30 trees


# 4.- Model evaluation

This is a little more difficult because the evaluation functionality still mostly resides in the RDD-API for Spark, requiring some different syntax. Let’s begin by getting predictions on our test data and storing them.

In [23]:
val results = model.transform(test).select("features", "label", "prediction")

results: org.apache.spark.sql.DataFrame = [features: vector, label: int ... 1 more field]


In [24]:
results.show

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(9,[0,1,6,7],[327...|    0|       0.0|
|(9,[0,1,6,7],[368...|    0|       0.0|
|(9,[0,1,6,7],[464...|    0|       0.0|
|(9,[0,1,6,7],[522...|    0|       0.0|
|(9,[0,1,6,7],[565...|    0|       0.0|
|(9,[0,1,6,7],[566...|    0|       0.0|
|(9,[0,1,6,7],[636...|    0|       0.0|
|(9,[0,1,6,7],[864...|    0|       0.0|
|(9,[0,1,6,7],[951...|    0|       0.0|
|(9,[0,1,6,7],[117...|    0|       0.0|
|(9,[0,1,6,7],[122...|    0|       0.0|
|(9,[0,1,6,7],[147...|    0|       0.0|
|(9,[0,1,6,7],[152...|    0|       0.0|
|(9,[0,1,6,7],[155...|    0|       0.0|
|(9,[0,1,6,7],[161...|    0|       0.0|
|(9,[0,1,6,7],[169...|    0|       0.0|
|(9,[0,1,6,7],[178...|    0|       0.0|
|(9,[0,1,6,7],[181...|    0|       0.0|
|(9,[0,1,6,7],[182...|    0|       0.0|
|(9,[0,1,6,7],[187...|    0|       0.0|
+--------------------+-----+----------+
only showing top 20 rows



We will then convert these results to an RDD.

In [25]:
val predictionAndLabels = results.select($"prediction",$"label").as[(Double, Double)].rdd

predictionAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[906] at rdd at <console>:39


We now create our metrics objects and print out the confusion matrix.

In [26]:
// Instantiate a new metrics objects
val bMetrics = new BinaryClassificationMetrics(predictionAndLabels)
val mMetrics = new MulticlassMetrics(predictionAndLabels)
val labels = mMetrics.labels

// Print out the Confusion matrix
println("Confusion matrix:")
println(mMetrics.confusionMatrix)

Confusion matrix:
1905661.0  0.0     
14.0       2464.0  


bMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@6bc42e44
mMetrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@99e0171
labels: Array[Double] = Array(0.0, 1.0)


We will now use the numbers in the confusion matrix to calculate some useful metrics.

In [27]:
// Precision by label
labels.foreach { l =>
  println(s"Precision($l) = " + mMetrics.precision(l))
}

// Recall by label
labels.foreach { l =>
  println(s"Recall($l) = " + mMetrics.recall(l))
}

// False positive rate by label
labels.foreach { l =>
  println(s"FPR($l) = " + mMetrics.falsePositiveRate(l))
}

// F-measure by label
labels.foreach { l =>
  println(s"F1-Score($l) = " + mMetrics.fMeasure(l))
}

Precision(0.0) = 0.9999926535217181
Precision(1.0) = 1.0
Recall(0.0) = 1.0
Recall(1.0) = 0.9943502824858758
FPR(0.0) = 0.005649717514124294
FPR(1.0) = 0.0
F1-Score(0.0) = 0.9999963267473663
F1-Score(1.0) = 0.9971671388101983


In [28]:
// Precision by threshold
val precision = bMetrics.precisionByThreshold
precision.foreach { case (t, p) =>
  println(s"Threshold: $t, Precision: $p")
}

// Recall by threshold
val recall = bMetrics.recallByThreshold
recall.foreach { case (t, r) =>
  println(s"Threshold: $t, Recall: $r")
}

// Precision-Recall Curve
val PRC = bMetrics.pr

// F-measure
val f1Score = bMetrics.fMeasureByThreshold
f1Score.foreach { case (t, f) =>
  println(s"Threshold: $t, F-score: $f, Beta = 1")
}

Threshold: 1.0, Precision: 1.0
Threshold: 0.0, Precision: 0.0012986475303947984
Threshold: 1.0, Recall: 0.9943502824858758
Threshold: 0.0, Recall: 1.0
Threshold: 0.0, F-score: 0.002593926464592328, Beta = 1
Threshold: 1.0, F-score: 0.9971671388101983, Beta = 1


precision: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[925] at map at BinaryClassificationMetrics.scala:214
recall: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[926] at map at BinaryClassificationMetrics.scala:214
PRC: org.apache.spark.rdd.RDD[(Double, Double)] = UnionRDD[929] at union at BinaryClassificationMetrics.scala:110
f1Score: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[930] at map at BinaryClassificationMetrics.scala:214


In [29]:
val beta = 0.5
val fScore = bMetrics.fMeasureByThreshold(beta)
f1Score.foreach { case (t, f) =>
  println(s"Threshold: $t, F-score: $f, Beta = 0.5")
}

// AUPRC
val auPRC = bMetrics.areaUnderPR
println("Area under precision-recall curve = " + auPRC)

// Compute thresholds used in ROC and PR curves
val thresholds = precision.map(_._1)

// ROC Curve
val roc = bMetrics.roc

// AUROC
val auROC = bMetrics.areaUnderROC
println("Area under ROC = " + auROC)

Threshold: 0.0, F-score: 0.002593926464592328, Beta = 0.5
Threshold: 1.0, F-score: 0.9971671388101983, Beta = 0.5
Area under precision-recall curve = 0.9971788097387865
Area under ROC = 0.9971751412429379


beta: Double = 0.5
fScore: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[931] at map at BinaryClassificationMetrics.scala:214
auPRC: Double = 0.9971788097387865
thresholds: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[937] at map at <console>:53
roc: org.apache.spark.rdd.RDD[(Double, Double)] = UnionRDD[941] at UnionRDD at BinaryClassificationMetrics.scala:90
auROC: Double = 0.9971751412429379


# 5.- Do we have a representative test set?

A very important assumption is that the statistical distributions we are learning from are the same in the training and test sets. We will use the are under the ROC curve as a measure of the similitude of these distributions: if they are indeed similar, the area below the ROC should be close to 0.5, indicating non-separability between the "training" and "testing" classes.

In [30]:
// renaming the "label" column
val data2_ver = data2.withColumnRenamed("label", "label_orig")

data2_ver: org.apache.spark.sql.DataFrame = [amount: double, oldbalanceOrg: double ... 8 more fields]


In [31]:
// train/test split for verification purposes
val Array(train_tmp, test_tmp) = data2_ver.randomSplit(Array(0.7, 0.3), seed = 801)

train_tmp: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [amount: double, oldbalanceOrg: double ... 8 more fields]
test_tmp: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [amount: double, oldbalanceOrg: double ... 8 more fields]


In [32]:
// add a "fake label" to distinguish train & test sets
val train_ver = train_tmp.withColumn("label", lit(1))
val test_ver = test_tmp.withColumn("label", lit(0))

train_ver: org.apache.spark.sql.DataFrame = [amount: double, oldbalanceOrg: double ... 9 more fields]
test_ver: org.apache.spark.sql.DataFrame = [amount: double, oldbalanceOrg: double ... 9 more fields]


In [33]:
// append train and test sets
val df_ver0 = train_ver.unionAll(test_ver).orderBy(rand()).orderBy(rand())

df_ver0: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [amount: double, oldbalanceOrg: double ... 9 more fields]


In [34]:
// assemble features vector
val assembler = (new VectorAssembler().setInputCols(Array("amount", "oldbalanceOrg", 
    "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "emptiedAccount", "zeroBalance", 
    "originBalanceDiff", "recipientBalanceDiff", "label_orig")).
   setOutputCol("features"))

// Transform the DataFrame
val df_ver1 = assembler.transform(df_ver0).select($"label",$"features")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_be13c94415cd
df_ver1: org.apache.spark.sql.DataFrame = [label: int, features: vector]


In [35]:
// invoke random forests classifier
val rf_ver = new RandomForestClassifier()

val paramGrid_ver = new ParamGridBuilder().addGrid(rf.numTrees, Array(20)).addGrid(rf.maxBins, Array(100)).build()

rf_ver: org.apache.spark.ml.classification.RandomForestClassifier = rfc_d04989db0114
paramGrid_ver: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_50246d962f4e-maxBins: 100,
	rfc_50246d962f4e-numTrees: 20
})


In [36]:
val cv_ver = new CrossValidator().
  setEstimator(rf_ver).
  setEvaluator(new BinaryClassificationEvaluator().setMetricName("areaUnderROC")).
  setEstimatorParamMaps(paramGrid_ver).
  setNumFolds(5).
  setParallelism(2)

cv_ver: org.apache.spark.ml.tuning.CrossValidator = cv_c01a65de6d2d


In [37]:
val model_ver = cv_ver.fit(df_ver1)

model_ver: org.apache.spark.ml.tuning.CrossValidatorModel = cv_c01a65de6d2d


In [38]:
model_ver.avgMetrics

res11: Array[Double] = Array(0.499694051371576)


The training and test sets have similar statistical distributions