 ## Anomaly Detection in Credit Card transactions with different classification algorithms.
 
Dataset: Credit Card Fraud Detection https://www.kaggle.com/mlg-ulb/creditcardfraud/data

This dataset presents transactions that occurred in two days, where we got 492 frauds out of 284,807 transactions. The dataset is highly unbalanced, the positive class (frauds) account for 0.172% of all transactions.

It contains only numerical input variables which are the result of a PCA transformation. Unfortunately, due to confidentiality issues, we cannot find the original features and more background information about the data. Features V1, V2, ... V28 are the principal components obtained with PCA, the only features which have not been transformed with PCA are 'Time' and 'Amount'. Feature 'Time' contains the seconds elapsed between each transaction and the first transaction in the dataset. The feature 'Amount' is the transaction Amount, this feature can be used for example-dependant cost-senstive learning. Feature 'Class' is the response variable and it takes value 1 in case of fraud and 0 otherwise.

 #### The implementation is done using the DataFrame-based API of SparkMLlib.
 
 #### Algorithms:
 
   - GBTClassifier
   - RandomForestClassifier
   - DecisionTreeClassifier
   - MultilayerPerceptronClassifier
   - NaiveBayes
   - LinearSVC
   - LogisticRegression 

In [1]:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.{col}
import org.apache.spark.ml.feature.{StandardScaler, VectorAssembler, StringIndexer, MinMaxScaler}
import org.apache.spark.ml.{Pipeline}
import org.apache.spark.ml.classification.{GBTClassifier, RandomForestClassifier, DecisionTreeClassifier, MultilayerPerceptronClassifier, NaiveBayes, LinearSVC, LogisticRegression, OneVsRest}
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator,BinaryClassificationEvaluator}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import java.io.{File, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Calendar

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
573,application_1580996944851_0529,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.col
import org.apache.spark.ml.feature.{StandardScaler, VectorAssembler, StringIndexer, MinMaxScaler}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassifier, RandomForestClassifier, DecisionTreeClassifier, MultilayerPerceptronClassifier, NaiveBayes, LinearSVC, LogisticRegression, OneVsRest}
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, BinaryClassificationEvaluator}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import java.io.{File, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Calendar


In [2]:
val raw = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").csv("datasets/creditcard.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

raw: org.apache.spark.sql.DataFrame = [Time: string, V1: string ... 29 more fields]


In [3]:
// cast all the column to Double type.
val df = raw.select(((1 to 28).map(i => "V" + i) ++ Array("Time", "Amount", "Class")).map(s => col(s).cast("Double")): _*)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

df: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 29 more fields]


In [4]:
println("num of records: " + df.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

num of records: 284807


In [5]:
// select a few columns to show.
df.select("V1", "V2", "Time", "Amount", "Class").show()
println(" Class statistics: 1 represents fraud and 0 represents normal")
df.groupBy("Class").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+-------------------+----+------+-----+
|                V1|                 V2|Time|Amount|Class|
+------------------+-------------------+----+------+-----+
|  -1.3598071336738|-0.0727811733098497| 0.0|149.62|  0.0|
|  1.19185711131486|   0.26615071205963| 0.0|  2.69|  0.0|
| -1.35835406159823|  -1.34016307473609| 1.0|378.66|  0.0|
|-0.966271711572087| -0.185226008082898| 1.0| 123.5|  0.0|
| -1.15823309349523|  0.877736754848451| 2.0| 69.99|  0.0|
|-0.425965884412454|  0.960523044882985| 2.0|  3.67|  0.0|
|  1.22965763450793|  0.141003507049326| 4.0|  4.99|  0.0|
|-0.644269442348146|   1.41796354547385| 7.0|  40.8|  0.0|
| -0.89428608220282|  0.286157196276544| 7.0|  93.2|  0.0|
| -0.33826175242575|   1.11959337641566| 9.0|  3.68|  0.0|
|  1.44904378114715|  -1.17633882535966|10.0|   7.8|  0.0|
|  0.38497821518095|  0.616109459176472|10.0|  9.99|  0.0|
|    1.249998742053|  -1.22163680921816|10.0| 121.5|  0.0|
|   1.0693735878819|  0.287722129331455|11.0|  27.5|  0.

In [6]:
val time = df.select("Time")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

time: org.apache.spark.sql.DataFrame = [Time: double]


In [7]:
val time0 = df.filter($"Class" === "0").select("Time")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

time0: org.apache.spark.sql.DataFrame = [Time: double]


In [8]:
val time1 = df.filter($"Class" === "1").select("Time")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

time1: org.apache.spark.sql.DataFrame = [Time: double]


In [9]:
time.describe("Time").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+
|summary|              Time|
+-------+------------------+
|  count|            284807|
|   mean| 94813.85957508067|
| stddev|47488.145954566215|
|    min|               0.0|
|    max|          172792.0|
+-------+------------------+



In [10]:
time0.describe("Time").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+
|summary|             Time|
+-------+-----------------+
|  count|           284315|
|   mean|94838.20225805884|
| stddev|47484.01578555081|
|    min|              0.0|
|    max|         172792.0|
+-------+-----------------+



In [11]:
time1.describe("Time").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+
|summary|             Time|
+-------+-----------------+
|  count|              492|
|   mean|80746.80691056911|
| stddev|47835.36513767506|
|    min|            406.0|
|    max|         170348.0|
+-------+-----------------+



### step 1. Build an inital pipeline for feature transform.

In [12]:
// convert the label from {0, 1} to {1, 2}
//val labelConverter = new FuncTransformer(udf {d: Double => if (d==0) 2 else d }).setInputCol("Class").setOutputCol("Class")
val labelConverter = new StringIndexer().setInputCol("Class").setOutputCol("label")
val assembler = new VectorAssembler().setInputCols((1 to 28).map(i => "V" + i).toArray ++ Array("Amount")).setOutputCol("assembled")
val scaler = new MinMaxScaler().setInputCol("assembled").setOutputCol("features")
//val scaler = new StandardScaler().setInputCol("assembled").setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(assembler, scaler, labelConverter))
val pipelineModel = pipeline.fit(df)
val data = pipelineModel.transform(df)
println("Generate feature from raw data:")
data.select("features", "label").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

labelConverter: org.apache.spark.ml.feature.StringIndexer = strIdx_33ea464affb1
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_799e7dc7a77f
scaler: org.apache.spark.ml.feature.MinMaxScaler = minMaxScal_f25d535fff57
pipeline: org.apache.spark.ml.Pipeline = pipeline_3ba1d85e5b7a
pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_3ba1d85e5b7a
data: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 32 more fields]
Generate feature from raw data:
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.93519233743373...|  0.0|
|[0.97854195497169...|  0.0|
|[0.93521702332994...|  0.0|
|[0.94187801720890...|  0.0|
|[0.93861683090479...|  0.0|
|[0.95105714452038...|  0.0|
|[0.97918413907815...|  0.0|
|[0.94734843724736...|  0.0|
|[0.94310096396120...|  0.0|
|[0.95254712917866...|  0.0|
|[0.98291123819341...|  0.0|
|[0.96483408113384...|  0.0|
|[0.97952970932083...|  0.0|
|[0.97646111149632...|  0.0|
|[0.91086362048888

In [13]:
data.createOrReplaceTempView("creditcard")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
//%%sql --maxrows 3
//SELECT count(v1),Time FROM creditcard GROUP BY Class

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### step 2. split the dataset into training and validation dataset.

In [15]:
    val splitTime = data.stat.approxQuantile("Time", Array(0.7), 0.0).head
    val trainingData = data.filter(s"Time<$splitTime").cache()
    val validationData = data.filter(s"Time>=$splitTime").cache()
    println("Split data into Training and Validation: ")
    println("training records count: " + trainingData.count())
    println("validation records count: " + validationData.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

splitTime: Double = 132929.0
trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [V1: double, V2: double ... 32 more fields]
validationData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [V1: double, V2: double ... 32 more fields]
Split data into Training and Validation: 
training records count: 199364
validation records count: 85443


In [16]:
println(" Training set statistics: 1 represents fraud and 0 represents normal")
trainingData.groupBy("Class").count().show()
println(" Validation set statistics: 1 represents fraud and 0 represents normal")
validationData.groupBy("Class").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 Training set statistics: 1 represents fraud and 0 represents normal
+-----+------+
|Class| count|
+-----+------+
|  0.0|198980|
|  1.0|   384|
+-----+------+

 Validation set statistics: 1 represents fraud and 0 represents normal
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+



In [17]:
val rfModel = {
    val rfGridSearch = for (
    rfNumTrees <- Array(10, 15, 20);
    rfImpurity <- Array("entropy","gini");
    rfMaxBins <- Array(24, 28, 32);
    rfmaxDepth <- Array(4, 6, 8)) 
    yield {
   println(s"Training random forest numTrees : $rfNumTrees, impurity : $rfImpurity, maxBins: $rfMaxBins, maxDepth : $rfmaxDepth")     
   val rfModel = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(rfNumTrees).setImpurity(rfImpurity).setMaxDepth(rfmaxDepth).setSeed(42).setMaxBins(rfMaxBins).fit(trainingData)
   val predictionsRF = rfModel.transform(validationData)      
   val rfAUC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC").evaluate(predictionsRF)  
   println("Area Under ROC Curve = " + rfAUC)
        ((rfNumTrees, rfImpurity, rfMaxBins, rfmaxDepth), rfModel, rfAUC)
    }
    
    println(rfGridSearch.sortBy(-_._3).take(5).mkString("\n"))
        val BestModel = rfGridSearch.sortBy(-_._3).head._2
}   

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training random forest numTrees : 10, impurity : entropy, maxBins: 24, maxDepth : 4
Area Under ROC Curve = 0.8055496962949943
Training random forest numTrees : 10, impurity : entropy, maxBins: 24, maxDepth : 6
Area Under ROC Curve = 0.8518284148096066
Training random forest numTrees : 10, impurity : entropy, maxBins: 24, maxDepth : 8
Area Under ROC Curve = 0.8518284148096066
Training random forest numTrees : 10, impurity : entropy, maxBins: 28, maxDepth : 4
Area Under ROC Curve = 0.8009200666653646
Training random forest numTrees : 10, impurity : entropy, maxBins: 28, maxDepth : 6
Area Under ROC Curve = 0.8564580444392361
Training random forest numTrees : 10, impurity : entropy, maxBins: 28, maxDepth : 8
Area Under ROC Curve = 0.8564521851786748
Training random forest numTrees : 10, impurity : entropy, maxBins: 32, maxDepth : 4
Area Under ROC Curve = 0.7777719185172165
Training random forest numTrees : 10, impurity : entropy, maxBins: 32, maxDepth : 6
Area Under ROC Curve = 0.856458044

In [18]:
//Gradient-boosted tree classifier
// Train a GBT model.
//setImpurity("entropy") 
val t = System.nanoTime
val gbt = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").setMaxIter(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t: Long = 27741046400737793
gbt: org.apache.spark.ml.classification.GBTClassifier = gbtc_a6910f947395


In [19]:
val modelGBT = gbt.fit(trainingData)
val durationtrain = (System.nanoTime - t) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationtrain secs")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

modelGBT: org.apache.spark.ml.classification.GBTClassificationModel = GBTClassificationModel (uid=gbtc_a6910f947395) with 10 trees
durationtrain: Double = 12.650345152

initial model training finished.
Training process takes 12.650345152 secs


In [20]:
val s = System.nanoTime
val predictionsGBT = modelGBT.transform(validationData)
val durationprediction = (System.nanoTime - s) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationprediction secs")
predictionsGBT.cache()
//val predictionsAndLabel: RDD[Row] = df.rdd= predictionsGBT.select("prediction", "label")
predictionsGBT.select("prediction", "label").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s: Long = 27741060608260362
predictionsGBT: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 35 more fields]
durationprediction: Double = 0.230977451

initial model training finished.
Training process takes 0.230977451 secs
res33: predictionsGBT.type = [V1: double, V2: double ... 35 more fields]
+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 5 rows



In [21]:
println(s"Classified test set :")
validationData.groupBy("Class").count().show()
println(s"Prediction :")
predictionsGBT.groupBy("prediction").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Classified test set :
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+

Prediction :
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|85366|
|       1.0|   77|
+----------+-----+



In [22]:
println(s"Matrice de confusion :")
predictionsGBT.select("prediction", "label").groupBy("prediction", "label").count().orderBy("label", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matrice de confusion :
+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       0.0|  0.0|85328|
|       1.0|  0.0|    7|
|       0.0|  1.0|   38|
|       1.0|  1.0|   70|
+----------+-----+-----+



In [23]:
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedRecall")
val evaluator4 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val areaUnderROC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC")
val accuracy = evaluator1.evaluate(predictionsGBT)
println("Area Under ROC Curve = " + areaUnderROC.evaluate(predictionsGBT))
println("Accuracy = " + evaluator1.evaluate(predictionsGBT))
println("Precision = " + evaluator2.evaluate(predictionsGBT))
println("Recall = " + evaluator3.evaluate(predictionsGBT))
println("F1 = " + evaluator4.evaluate(predictionsGBT))
println("Test Error = " + (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

evaluator1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_ac2db7402e95
evaluator2: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_0c68a22e4696
evaluator3: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_ca926475a32f
evaluator4: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_378c8fb268a6
areaUnderROC: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_913de66f1a0d
accuracy: Double = 0.9994733330992591
Area Under ROC Curve = 0.8240330592501449
Accuracy = 0.9994733330992591
Precision = 0.999440511423835
Recall = 0.9994733330992592
F1 = 0.9994292547759536
Test Error = 5.266669007408797E-4


In [24]:
//val metrics = new BinaryClassificationMetrics(predictionsAndLabel)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
// Random forest classifier
// Train a RandomForest model.
val rf = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(10)
val t = System.nanoTime

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_ea807e98f914
t: Long = 27741069986055982


In [26]:
val modelRF = rf.fit(trainingData)
val durationtrain = (System.nanoTime - t) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationtrain secs")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

modelRF: org.apache.spark.ml.classification.RandomForestClassificationModel = RandomForestClassificationModel (uid=rfc_ea807e98f914) with 10 trees
durationtrain: Double = 2.454960518

initial model training finished.
Training process takes 2.454960518 secs


In [27]:
val s = System.nanoTime
val predictionsRF = modelRF.transform(validationData)
val durationprediction = (System.nanoTime - s) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationprediction secs")
predictionsRF.select("prediction", "label", "features").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s: Long = 27741072901014825
predictionsRF: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 35 more fields]
durationprediction: Double = 0.262056219

initial model training finished.
Training process takes 0.262056219 secs
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[0.93273307306919...|
|       0.0|  0.0|[0.99486362369262...|
|       0.0|  0.0|[0.98906514405821...|
|       0.0|  0.0|[0.93238877114205...|
|       0.0|  0.0|[0.95997796495358...|
+----------+-----+--------------------+
only showing top 5 rows



In [28]:
println(s"Classified test set :")
validationData.groupBy("Class").count().show()
println(s"Prediction :")
predictionsRF.groupBy("prediction").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Classified test set :
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+

Prediction :
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|85367|
|       1.0|   76|
+----------+-----+



In [29]:
println(s"Matrice de confusion :")
predictionsRF.select("prediction", "label").groupBy("prediction", "label").count().orderBy("label", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matrice de confusion :
+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       0.0|  0.0|85333|
|       1.0|  0.0|    2|
|       0.0|  1.0|   34|
|       1.0|  1.0|   74|
+----------+-----+-----+



In [30]:
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedRecall")
val evaluator4 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val areaUnderROC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC")
val accuracy = evaluator1.evaluate(predictionsRF)
println("Area Under ROC Curve = " + areaUnderROC.evaluate(predictionsRF))
println("Accuracy = " + evaluator1.evaluate(predictionsRF))
println("Precision = " + evaluator2.evaluate(predictionsRF))
println("Recall = " + evaluator3.evaluate(predictionsRF))
println("F1 = " + evaluator4.evaluate(predictionsRF))
println("Test Error = " + (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

evaluator1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_ddcee36a9594
evaluator2: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_0ed014002845
evaluator3: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_33bf5a238976
evaluator4: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_38db9ff81d1a
areaUnderROC: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_7881ed0c5ff0
accuracy: Double = 0.9995786664794073
Area Under ROC Curve = 0.8425808740714699
Accuracy = 0.9995786664794073
Precision = 0.9995689598879786
Recall = 0.9995786664794073
F1 = 0.9995420682738061
Test Error = 4.213335205927038E-4


In [31]:
// Decision Tree classifier
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier().setLabelCol("label").setFeaturesCol("features")
val t = System.nanoTime

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dt: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_094f93b116ff
t: Long = 27741081011018979


In [32]:
val modelDT = dt.fit(trainingData)
val durationtrain = (System.nanoTime - t) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationtrain secs")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

modelDT: org.apache.spark.ml.classification.DecisionTreeClassificationModel = DecisionTreeClassificationModel (uid=dtc_094f93b116ff) of depth 5 with 31 nodes
durationtrain: Double = 2.62227778

initial model training finished.
Training process takes 2.62227778 secs


In [33]:
val s = System.nanoTime
val predictionsDT = modelDT.transform(validationData)
val durationprediction = (System.nanoTime - s) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationprediction secs")
predictionsDT.select("prediction", "label", "features").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s: Long = 27741084930243090
predictionsDT: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 35 more fields]
durationprediction: Double = 0.217574857

initial model training finished.
Training process takes 0.217574857 secs
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[0.93273307306919...|
|       0.0|  0.0|[0.99486362369262...|
|       0.0|  0.0|[0.98906514405821...|
|       0.0|  0.0|[0.93238877114205...|
|       0.0|  0.0|[0.95997796495358...|
+----------+-----+--------------------+
only showing top 5 rows



In [34]:
println(s"Classified test set :")
validationData.groupBy("Class").count().show()
println(s"Prediction :")
predictionsDT.groupBy("prediction").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Classified test set :
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+

Prediction :
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|85369|
|       1.0|   74|
+----------+-----+



In [35]:
println(s"Matrice de confusion :")
predictionsDT.select("prediction", "label").groupBy("prediction", "label").count().orderBy("label", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matrice de confusion :
+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       0.0|  0.0|85332|
|       1.0|  0.0|    3|
|       0.0|  1.0|   37|
|       1.0|  1.0|   71|
+----------+-----+-----+



In [36]:
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedRecall")
val evaluator4 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val areaUnderROC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC")
val accuracy = evaluator1.evaluate(predictionsDT)
println("Area Under ROC Curve = " + areaUnderROC.evaluate(predictionsDT))
println("Accuracy = " + evaluator1.evaluate(predictionsDT))
println("Precision = " + evaluator2.evaluate(predictionsDT))
println("Recall = " + evaluator3.evaluate(predictionsDT))
println("F1 = " + evaluator4.evaluate(predictionsDT))
println("Test Error = " + (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

evaluator1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_59bb5d25ac9e
evaluator2: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_8bf846dfe775
evaluator3: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_1cff0bc00741
evaluator4: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_9d7c95a604d9
areaUnderROC: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_452790eb4860
accuracy: Double = 0.9995318516437859
Area Under ROC Curve = 0.8286861259220197
Accuracy = 0.9995318516437859
Precision = 0.9995158919706696
Recall = 0.999531851643786
F1 = 0.9994881701223897
Test Error = 4.681483562141153E-4


In [37]:
/* val trainingData1 = trainingData.withColumn("label", when(col("label") === 0, -1)
                                   .otherwise(col("label"))
                           );
val validationData1 = validationData.withColumn("label", when(col("label") === 0, -1)
                                   .otherwise(col("label"))
                           );*/

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
// SVM classifier
// Train a SVM model.
val svm = new LinearSVC().setLabelCol("label").setFeaturesCol("features").setMaxIter(100).setRegParam(0.1) 
val t = System.nanoTime

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

svm: org.apache.spark.ml.classification.LinearSVC = linearsvc_0e878a735293
t: Long = 27741091756026105


In [39]:
val modelSVM = svm.fit(trainingData)
val durationtrain = (System.nanoTime - t) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationtrain secs")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

modelSVM: org.apache.spark.ml.classification.LinearSVCModel = linearsvc_0e878a735293
durationtrain: Double = 72.644895558

initial model training finished.
Training process takes 72.644895558 secs


In [40]:
val s = System.nanoTime
val predictionsSVM = modelSVM.transform(validationData)
val durationprediction = (System.nanoTime - s) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationprediction secs")
predictionsSVM.select("prediction", "label", "features").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s: Long = 27741165310508848
predictionsSVM: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 34 more fields]
durationprediction: Double = 0.209437886

initial model training finished.
Training process takes 0.209437886 secs
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[0.93273307306919...|
|       0.0|  0.0|[0.99486362369262...|
|       0.0|  0.0|[0.98906514405821...|
|       0.0|  0.0|[0.93238877114205...|
|       0.0|  0.0|[0.95997796495358...|
+----------+-----+--------------------+
only showing top 5 rows



In [41]:
println(s"Classified test set :")
validationData.groupBy("Class").count().show()
println(s"Prediction :")
predictionsSVM.groupBy("prediction").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Classified test set :
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+

Prediction :
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|85443|
+----------+-----+



In [42]:
println(s"Matrice de confusion :")
predictionsSVM.select("prediction", "label").groupBy("prediction", "label").count().orderBy("label", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matrice de confusion :
+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       0.0|  0.0|85335|
|       0.0|  1.0|  108|
+----------+-----+-----+



In [43]:
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedRecall")
val evaluator4 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val areaUnderROC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC")
val accuracy = evaluator1.evaluate(predictionsSVM)
println("Area Under ROC Curve = " + areaUnderROC.evaluate(predictionsSVM))
println("Accuracy = " + evaluator1.evaluate(predictionsSVM))
println("Precision = " + evaluator2.evaluate(predictionsSVM))
println("Recall = " + evaluator3.evaluate(predictionsSVM))
println("F1 = " + evaluator4.evaluate(predictionsSVM))
println("Test Error = " + (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

evaluator1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_5921ade73f0a
evaluator2: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_148eb487f040
evaluator3: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_63f482a17ebe
evaluator4: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_814d4afcf9f7
areaUnderROC: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_ff882b57b6cf
accuracy: Double = 0.998735999438222
Area Under ROC Curve = 0.5
Accuracy = 0.998735999438222
Precision = 0.9974735965738643
Recall = 0.998735999438222
F1 = 0.998104398834284
Test Error = 0.0012640005617780004


In [44]:
// Logistic Regression classifier
val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("features").setMaxIter(10).setTol(1E-6).setFitIntercept(true).setRegParam(0.3).setElasticNetParam(0.8)
val t = System.nanoTime

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

lr: org.apache.spark.ml.classification.LogisticRegression = logreg_836fb4e1deba
t: Long = 27741171985836812


In [45]:
// train the multiclass model.
//val Modelovr = ovr.fit(trainingData)
val Modellr = lr.fit(trainingData)
val durationtrain = (System.nanoTime - t) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationtrain secs")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Modellr: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid = logreg_836fb4e1deba, numClasses = 2, numFeatures = 29
durationtrain: Double = 1.237882838

initial model training finished.
Training process takes 1.237882838 secs


In [46]:
val s = System.nanoTime
val predictionsLR = Modellr.transform(validationData)
val durationprediction = (System.nanoTime - s) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationprediction secs")
predictionsLR.select("prediction", "label", "features").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s: Long = 27741173827800361
predictionsLR: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 35 more fields]
durationprediction: Double = 0.211721649

initial model training finished.
Training process takes 0.211721649 secs
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[0.93273307306919...|
|       0.0|  0.0|[0.99486362369262...|
|       0.0|  0.0|[0.98906514405821...|
|       0.0|  0.0|[0.93238877114205...|
|       0.0|  0.0|[0.95997796495358...|
+----------+-----+--------------------+
only showing top 5 rows



In [47]:
println(s"Classified test set :")
validationData.groupBy("Class").count().show()
println(s"Prediction :")
predictionsLR.groupBy("prediction").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Classified test set :
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+

Prediction :
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|85443|
+----------+-----+



In [48]:
println(s"Matrice de confusion :")
predictionsLR.select("prediction", "label").groupBy("prediction", "label").count().orderBy("label", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matrice de confusion :
+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       0.0|  0.0|85335|
|       0.0|  1.0|  108|
+----------+-----+-----+



In [49]:
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedRecall")
val evaluator4 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val areaUnderROC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC")
val accuracy = evaluator1.evaluate(predictionsLR)
println("Area Under ROC Curve = " + areaUnderROC.evaluate(predictionsLR))
println("Accuracy = " + evaluator1.evaluate(predictionsLR))
println("Precision = " + evaluator2.evaluate(predictionsLR))
println("Recall = " + evaluator3.evaluate(predictionsLR))
println("F1 = " + evaluator4.evaluate(predictionsLR))
println("Test Error = " + (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

evaluator1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_887c612e2ed6
evaluator2: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_ca567f8fca05
evaluator3: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_cd447952483f
evaluator4: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_b5e085d87701
areaUnderROC: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_ea6b8f82996e
accuracy: Double = 0.998735999438222
Area Under ROC Curve = 0.5
Accuracy = 0.998735999438222
Precision = 0.9974735965738643
Recall = 0.998735999438222
F1 = 0.998104398834284
Test Error = 0.0012640005617780004


In [50]:
// Naive Bayes classifier
// Train a NaiveBayes model.
val nb = new NaiveBayes().setLabelCol("label").setFeaturesCol("features").setSmoothing(1.0)
val t = System.nanoTime

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

nb: org.apache.spark.ml.classification.NaiveBayes = nb_75107e3d9433
t: Long = 27741181556658039


In [51]:
/*val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("featuresScaled")
val pipeline = new Pipeline().setStages(Array(scaler))
val pipelineModel1 = pipeline.fit(trainingData)
val pipelineModel2 = pipeline.fit(trainingData)
val datatrain = pipelineModel1.transform(trainingData)
val datatest = pipelineModel2.transform(validationData)*/

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
val modelNB = nb.fit(trainingData)
val durationtrain = (System.nanoTime - t) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationtrain secs")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

modelNB: org.apache.spark.ml.classification.NaiveBayesModel = NaiveBayesModel (uid=nb_75107e3d9433) with 2 classes
durationtrain: Double = 1.282913141

initial model training finished.
Training process takes 1.282913141 secs


In [53]:
val s = System.nanoTime
val predictionsNB = modelNB.transform(validationData)
val durationprediction = (System.nanoTime - s) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationprediction secs")
predictionsNB.select("prediction", "label").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s: Long = 27741183654202391
predictionsNB: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 35 more fields]
durationprediction: Double = 0.227650635

initial model training finished.
Training process takes 0.227650635 secs
+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 5 rows



In [54]:
println(s"Classified test set :")
validationData.groupBy("Class").count().show()
println(s"Prediction :")
predictionsNB.groupBy("prediction").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Classified test set :
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+

Prediction :
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|85443|
+----------+-----+



In [55]:
println(s"Matrice de confusion :")
predictionsNB.select("prediction", "label").groupBy("prediction", "label").count().orderBy("label", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matrice de confusion :
+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       0.0|  0.0|85335|
|       0.0|  1.0|  108|
+----------+-----+-----+



In [56]:
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedRecall")
val evaluator4 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val areaUnderROC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC")
val accuracy = evaluator1.evaluate(predictionsNB)
println("Area Under ROC Curve = " + areaUnderROC.evaluate(predictionsNB))
println("Accuracy = " + evaluator1.evaluate(predictionsNB))
println("Precision = " + evaluator2.evaluate(predictionsNB))
println("Recall = " + evaluator3.evaluate(predictionsNB))
println("F1 = " + evaluator4.evaluate(predictionsNB))
println("Test Error = " + (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

evaluator1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_2c2d1eed1136
evaluator2: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_04d0077b48ea
evaluator3: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_a537dd92569d
evaluator4: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_45f871497299
areaUnderROC: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_22dbbdc9413c
accuracy: Double = 0.998735999438222
Area Under ROC Curve = 0.5
Accuracy = 0.998735999438222
Precision = 0.9974735965738643
Recall = 0.998735999438222
F1 = 0.998104398834284
Test Error = 0.0012640005617780004


In [57]:
// Multilayer Perceptron Classifier
// create the trainer and set its parameters
val t = System.nanoTime
val layers = Array[Int] (29,15,7, 2)
val mlp = new MultilayerPerceptronClassifier().setLayers(layers).setLabelCol("label").setFeaturesCol("features").setTol(1E-4).setBlockSize(128).setSeed(1234L).setMaxIter(25) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t: Long = 27741190143350460
layers: Array[Int] = Array(29, 15, 7, 2)
mlp: org.apache.spark.ml.classification.MultilayerPerceptronClassifier = mlpc_3992e9abc817


In [58]:
// Train a Multilayer Perceptron model.
val modelMLP = mlp.fit(trainingData)
val durationtrain = (System.nanoTime - t) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationtrain secs")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

modelMLP: org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel = mlpc_3992e9abc817
durationtrain: Double = 9.314514534

initial model training finished.
Training process takes 9.314514534 secs


In [59]:
val s = System.nanoTime
val predictionsMLP = modelMLP.transform(validationData)
val durationprediction = (System.nanoTime - s) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationprediction secs")
predictionsMLP.select("prediction","label").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s: Long = 27741200267015758
predictionsMLP: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 35 more fields]
durationprediction: Double = 0.211891287

initial model training finished.
Training process takes 0.211891287 secs
+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 5 rows



In [60]:
println(s"Classified test set :")
validationData.groupBy("Class").count().show()
println(s"Prediction :")
predictionsMLP.groupBy("prediction").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Classified test set :
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+

Prediction :
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|85365|
|       1.0|   78|
+----------+-----+



In [61]:
println(s"Matrice de confusion :")
predictionsMLP.select("prediction", "label").groupBy("prediction", "label").count().orderBy("label", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matrice de confusion :
+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       0.0|  0.0|85333|
|       1.0|  0.0|    2|
|       0.0|  1.0|   32|
|       1.0|  1.0|   76|
+----------+-----+-----+



In [62]:
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedRecall")
val evaluator4 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val areaUnderROC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC")
val accuracy = evaluator1.evaluate(predictionsMLP)
println("Area Under ROC Curve = " + areaUnderROC.evaluate(predictionsMLP))
println("Accuracy = " + evaluator1.evaluate(predictionsMLP))
println("Precision = " + evaluator2.evaluate(predictionsMLP))
println("Recall = " + evaluator3.evaluate(predictionsMLP))
println("F1 = " + evaluator4.evaluate(predictionsMLP))
println("Test Error = " + (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

evaluator1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_33ca3a3e3a16
evaluator2: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_bc71c8ee2c79
evaluator3: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_2f685d0415c1
evaluator4: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_a86f09c6fd57
areaUnderROC: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_c1acb93ff895
accuracy: Double = 0.999602073897218
Area Under ROC Curve = 0.8518401333307292
Accuracy = 0.999602073897218
Precision = 0.9995932026620965
Recall = 0.999602073897218
F1 = 0.9995700180496218
Test Error = 3.97926102781998E-4


In [63]:
// One Vs Rest Classifier using Logistic Regression classifier
val classifier = new LogisticRegression().setLabelCol("label").setFeaturesCol("features").setMaxIter(10).setTol(1E-6).setFitIntercept(true) 
val t = System.nanoTime

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

classifier: org.apache.spark.ml.classification.LogisticRegression = logreg_2c8221f7f537
t: Long = 27741207896263766


In [64]:
// train the multiclass model.
val ovr = new OneVsRest().setClassifier(classifier)
val Modelovr = ovr.fit(trainingData)
val durationtrain = (System.nanoTime - t) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationtrain secs")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ovr: org.apache.spark.ml.classification.OneVsRest = oneVsRest_2bca44fa1d40
Modelovr: org.apache.spark.ml.classification.OneVsRestModel = oneVsRest_2bca44fa1d40
durationtrain: Double = 3.89588108

initial model training finished.
Training process takes 3.89588108 secs


In [65]:
val s = System.nanoTime
val predictionsOVR = Modelovr.transform(validationData)
val durationprediction = (System.nanoTime - s) / 1e9d
println("\ninitial model training finished.")
println(s"Training process takes $durationprediction secs")
predictionsOVR.select("prediction", "label", "features").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s: Long = 27741213899572675
predictionsOVR: org.apache.spark.sql.DataFrame = [V1: double, V2: double ... 34 more fields]
durationprediction: Double = 0.362011561

initial model training finished.
Training process takes 0.362011561 secs
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[0.93273307306919...|
|       0.0|  0.0|[0.99486362369262...|
|       0.0|  0.0|[0.98906514405821...|
|       0.0|  0.0|[0.93238877114205...|
|       0.0|  0.0|[0.95997796495358...|
+----------+-----+--------------------+
only showing top 5 rows



In [66]:
println(s"Classified test set :")
validationData.groupBy("Class").count().show()
println(s"Prediction :")
predictionsOVR.groupBy("prediction").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Classified test set :
+-----+-----+
|Class|count|
+-----+-----+
|  0.0|85335|
|  1.0|  108|
+-----+-----+

Prediction :
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|85420|
|       1.0|   23|
+----------+-----+



In [67]:
println(s"Matrice de confusion :")
predictionsOVR.select("prediction", "label").groupBy("prediction", "label").count().orderBy("label", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matrice de confusion :
+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       0.0|  0.0|85335|
|       0.0|  1.0|   85|
|       1.0|  1.0|   23|
+----------+-----+-----+



In [68]:
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedRecall")
val evaluator4 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val areaUnderROC = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderROC")
val areaUnderPR = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("label").setMetricName("areaUnderPR")
val accuracy = evaluator1.evaluate(predictionsOVR)
println("Area Under ROC Curve = " + areaUnderROC.evaluate(predictionsOVR))
println("Area Under the Precision-Recall Curve = "  + areaUnderPR.evaluate(predictionsOVR))
println("Accuracy = " + evaluator1.evaluate(predictionsOVR))
println("Precision = " + evaluator2.evaluate(predictionsOVR))
println("Recall = " + evaluator3.evaluate(predictionsOVR))
println("F1 = " + evaluator4.evaluate(predictionsOVR))
println("Test Error = " + (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

evaluator1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_5e0b7baff1e4
evaluator2: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_44d72abfb50d
evaluator3: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_81ccf0b98c78
evaluator4: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_359688e483f0
areaUnderROC: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_f9f93c14aec9
areaUnderPR: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_c8b9f2fd68ca
accuracy: Double = 0.9990051847430451
Area Under ROC Curve = 0.6064814814814815
Area Under the Precision-Recall Curve = 0.606978889109959
Accuracy = 0.9990051847430451
Precision = 0.9990061746669134
Recall = 0.9990051847430451
F1 = 0.9986826869394148
Test Error = 9.94815256954884E-4
