In [2]:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,VectorAssembler}
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.evaluation.MulticlassMetrics
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

Data Pre-Processing
------------------

In [3]:
val path = "nassCDS.csv"
val readData = spark.read.option("header","true").option("inferSchema","true").option("sep", ",").csv(path)
val dataWithId  = readData.withColumnRenamed("_c0","row_id")
val cleanData1 = dataWithId.na.drop
val dvcatConvert = udf { (x: String) => 
    if (x == "24-Oct") "10-24"
    else if (x == "1-9km/h" ) "1-9"
    else x
}
val cleanData2 = cleanData1.withColumn("dvcatConverted" , dvcatConvert(cleanData1("dvcat")))
val filterConvert = udf { (x: String) => 
    if (x == "NA") ""
    else if (x != null ) x
    else ""
}
val cleanData3 = cleanData2.withColumn("yearVehFilter" , filterConvert(cleanData2("yearVeh")))
val cleanData4 = cleanData3.withColumn("injSeverityFilter" , filterConvert(cleanData3("injSeverity")))

val cleanData5 = cleanData4.na.drop

In [3]:
val dvcatIndexer = new StringIndexer().setInputCol("dvcatConverted").setOutputCol("DvcatIndex")
val dvcatIndexed = dvcatIndexer.fit(cleanData5).transform(cleanData5)

val deadIndexer = new StringIndexer().setInputCol("dead").setOutputCol("label")
val deadIndexed = deadIndexer.fit(dvcatIndexed).transform(dvcatIndexed)

val airbagInder = new StringIndexer().setInputCol("airbag").setOutputCol("airbagIndex")
val airbagIndexed = airbagInder.fit(deadIndexed).transform(deadIndexed)

val seatbeltIndexer = new StringIndexer().setInputCol("seatbelt").setOutputCol("seatbeltIndex")
val seatbeltIndexed = seatbeltIndexer.fit(airbagIndexed).transform(airbagIndexed)

val sexIndexer = new StringIndexer().setInputCol("sex").setOutputCol("sexIndex")
val sexIndexed = sexIndexer.fit(seatbeltIndexed).transform(seatbeltIndexed)

val abcatIndexer = new StringIndexer().setInputCol("abcat").setOutputCol("abcatIndex") 
val abcatIndexed = abcatIndexer.fit(sexIndexed).transform(sexIndexed)

val occRoleIndexer = new StringIndexer().setInputCol("occRole").setOutputCol("occRoleIndex")
val occRoleIndexed = occRoleIndexer.fit(abcatIndexed).transform(abcatIndexed)

val yearVehIndexer = new StringIndexer().setInputCol("yearVeh").setOutputCol("yearVehIndex")
val yearVehIndexed = yearVehIndexer.fit(occRoleIndexed).transform(occRoleIndexed)

val injSeverityIndexer = new StringIndexer().setInputCol("injSeverity").setOutputCol("injSeverityIndex")
val injSeverityIndexed = injSeverityIndexer.fit(yearVehIndexed).transform(yearVehIndexed)

In [4]:
val dvcatEncoder = new OneHotEncoder().setInputCol("DvcatIndex").setOutputCol("DvcatVec")
val dvcatEncoded = dvcatEncoder.transform(injSeverityIndexed)

val airbagEncoder = new OneHotEncoder().setInputCol("airbagIndex").setOutputCol("airbagVec")
val airbagEncoded = airbagEncoder.transform(dvcatEncoded)

val seatbeltEncoder = new OneHotEncoder().setInputCol("seatbeltIndex").setOutputCol("seatbeltVec")
val seatbeltEncoded = seatbeltEncoder.transform(airbagEncoded)

val sexEncoder = new OneHotEncoder().setInputCol("sexIndex").setOutputCol("sexVec")
val sexEncoded = sexEncoder.transform(seatbeltEncoded)

val abcatEncoder = new OneHotEncoder().setInputCol("abcatIndex").setOutputCol("abcatVec")
val abcatEncoded = abcatEncoder.transform(sexEncoded)

val occRoleEncoder = new OneHotEncoder().setInputCol("occRoleIndex").setOutputCol("occRoleVec")
val occRoleEncoded = occRoleEncoder.transform(abcatEncoded)

val yearVehEncoder = new OneHotEncoder().setInputCol("yearVehIndex").setOutputCol("yearVehVec")
val yearVehEncoded = yearVehEncoder.transform(occRoleEncoded)

In [5]:
val vecAssembler = new VectorAssembler().
    setInputCols(Array("DvcatVec","weight","airbagVec","seatbeltVec","frontal","sexVec","ageOFocc","yearacc",
    "yearVehVec","abcatVec","occRoleVec","deploy","injSeverityIndex")).
    setOutputCol("features")
val vecAssembled = vecAssembler.transform(yearVehEncoded)
val Array(train, test) = vecAssembled.randomSplit(Array(0.7, 0.3),seed=1)

Logistic Regression (with Weighted Column) 
---------------------------------------

In [6]:
//Weighted (Over-Sampling)

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.DataFrame
def balanceDataset(dataset: DataFrame): DataFrame = {

    // Re-balancing (weighting) of records to be used in the logistic loss objective function
    val numNegatives = dataset.filter(dataset("label") === 0).count
    val datasetSize = dataset.count
    val balancingRatio = (datasetSize - numNegatives).toDouble / datasetSize

    val calculateWeights = udf { d: Double =>
      if (d == 0.0) {
        1 * balancingRatio
      }
      else {
        (1 * (1.0 - balancingRatio))
      }
    }

    val weightedDataset = dataset.withColumn("classWeightCol", calculateWeights(dataset("label")))
    weightedDataset
  }

In [7]:
val weightedDF = balanceDataset(vecAssembled)
val Array(train_w, test_w) = weightedDF.randomSplit(Array(0.7, 0.3),seed=1)

In [8]:
var startTime_lr_w = System.currentTimeMillis()
val lr_w = new LogisticRegression().setLabelCol("label").setFeaturesCol("features").setWeightCol("classWeightCol").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val predictionDF_lr_w = lr_w.fit(train_w).transform(test_w)
var endTime_lr_w = System.currentTimeMillis()

In [9]:
//Model Evaluation
val predictionAndLabels_MCM_lr_w = predictionDF_lr_w.select("prediction","label").as[(Double,Double)].rdd
val metrics_lr_w = new MulticlassMetrics(predictionAndLabels_MCM_lr_w)
val predictionLabelsRDD_lr_w = predictionDF_lr_w.select("prediction", "label").map(r => (r.getDouble(0), r.getDouble(1)))
val bMetrics_lr_w = new BinaryClassificationMetrics(predictionLabelsRDD_lr_w.rdd)
println("Confusion Matrix of Logistics Regression is")
println(metrics_lr_w.confusionMatrix)
println("\nThe Accuracy of Logistics Regression is")
println(metrics_lr_w.accuracy)
println("\nThe ROC for Logistics Regression is")
println(bMetrics_lr_w.roc.collect().foreach(print))
print("\n")
println("\nThe Area under ROC for Logistics Regression is")
println(bMetrics_lr_w.areaUnderROC)
println("\nThe Precision of Logistics Regression is")
println(metrics_lr_w.precision)
println("\nThe Precision by Threshold for Logistics Regression is")
println(bMetrics_lr_w.precisionByThreshold.collect().foreach(print))
println("\nExecution Time for Training and Prediction of Logistics Regression is")
println(endTime_lr_w-startTime_lr_w + "ms")

Confusion Matrix of Logistics Regression is
6117.0  1360.0  
23.0    315.0   

The Accuracy of Logistics Regression is
0.8230326295585413

The ROC for Logistics Regression is
(0.0,0.0)(0.18189113280727565,0.9319526627218935)(1.0,1.0)(1.0,1.0)()

The Area under ROC for Logistics Regression is
0.8750307649573089

The Precision of Logistics Regression is
0.8230326295585413

The Precision by Threshold for Logistics Regression is
(1.0,0.1880597014925373)(0.0,0.043250159948816376)()

Execution Time for Training and Prediction of Logistics Regression is
4634ms


### Analysis

Here we have used weighted column approach to overcome the unbalanced dataset issues. Now the evaluations results for logistical regression looks more realistic. It is expected that logistic regression to do good with categorical variable and classification and 82.3 is not a bad figure for accuracy in that regard.But one thing to note here is that the execution time for logistical regression has been increased compared to the one in CASE STUDY 1. It might be because we are using a weighted column approach. It is the best approach for logistical regression to deal with unbalanced data.

Decision Tree (With Defined thresholds)
------------------------------------

In [18]:
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
var startTime_dt_w = System.currentTimeMillis()
val dt_w = new DecisionTreeClassifier().setLabelCol("label").setFeaturesCol("features").setThresholds(Array(0.95,0.05))
val predictionDF_dt_w = dt_w.fit(train).transform(test)
var endTime_dt_w = System.currentTimeMillis()

In [19]:
//Model Evaluation
val predictionAndLabels_MCM_dt_w = predictionDF_dt_w.select("prediction","label").as[(Double,Double)].rdd
val metrics_dt_w = new MulticlassMetrics(predictionAndLabels_MCM_dt_w)
val predictionLabelsRDD_dt_w = predictionDF_dt_w.select("prediction", "label").map(r => (r.getDouble(0), r.getDouble(1)))
val bMetrics_dt_w = new BinaryClassificationMetrics(predictionLabelsRDD_dt_w.rdd)
println("Confusion Matrix of Decision Tree is")
println(metrics_dt_w.confusionMatrix)
println("\nThe Accuracy of Decision Tree is")
println(metrics_dt_w.accuracy)
println("\nThe ROC for Decision Tree is")
println(bMetrics_dt_w.roc.collect().foreach(print))
print("\n")
println("\nThe Area under ROC for Decision Tree is")
println(bMetrics_dt_w.areaUnderROC)
println("\nThe Precision of Decision Tree is")
println(metrics_dt_w.precision)
println("\nThe Precision by Threshold for Decision Tree is")
println(bMetrics_dt_w.precisionByThreshold.collect().foreach(print))
println("\nExecution Time for Training and Prediction of Decision Tree is")
println(endTime_dt_w-startTime_dt_w + "ms")

Confusion Matrix of Decision Tree is
7331.0  146.0  
17.0    321.0  

The Accuracy of Decision Tree is
0.9791426743442099

The ROC for Decision Tree is
(0.0,0.0)(0.019526548080781064,0.9497041420118343)(1.0,1.0)(1.0,1.0)()

The Area under ROC for Decision Tree is
0.9650887969655266

The Precision of Decision Tree is
0.9791426743442099

The Precision by Threshold for Decision Tree is
(1.0,0.6873661670235546)(0.0,0.043250159948816376)()

Execution Time for Training and Prediction of Decision Tree is
4730ms


### Analysis

Here to overcome the umbalanced data problem we are setting the threshold values while defining the model. The weighted column approach is not yet available in decision trees. Setting thresholds is not a recommended approach in other languages but in spark this is the best solution currently availabe to deal with unbalanced data. 'Precision by Threshold' is almost balanced in this approach and that is the proof that we are getting a realistic accuracy rate. Changing the seed value and repeatedly executing the model was giving accuracy around 95% in some scenarios and an even more balanced 'Precision by Threshold' values. As decision trees are known for over-fitting we cannot relay much on this model

Random Forest (with Defined Thresholds)
------------------------------------

In [10]:
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
var startTime_rf_w = System.currentTimeMillis()
val rf_w = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(500).setThresholds(Array(0.95,0.05))
val predictionDF_rf_w = rf_w.fit(train).transform(test)
var endTime_rf_w = System.currentTimeMillis()

In [11]:
//Model Evaluation
val predictionAndLabels_MCM_rf_w = predictionDF_rf_w.select("prediction","label").as[(Double,Double)].rdd
val metrics_rf_w = new MulticlassMetrics(predictionAndLabels_MCM_rf_w)
val predictionLabelsRDD_rf_w = predictionDF_rf_w.select("prediction", "label").map(r => (r.getDouble(0), r.getDouble(1)))
val bMetrics_rf_w = new BinaryClassificationMetrics(predictionLabelsRDD_rf_w.rdd)
println("Confusion Matrix of Random Forest is")
println(metrics_rf_w.confusionMatrix)
println("\nThe Accuracy of Random Forest is")
println(metrics_rf_w.accuracy)
println("\nThe ROC for Random Forest is")
println(bMetrics_rf_w.roc.collect().foreach(print))
print("\n")
println("\nThe Area under ROC for Random Forest is")
println(bMetrics_rf_w.areaUnderROC)
println("\nThe Precision of Random Forest is")
println(metrics_rf_w.precision)
println("\nThe Precision by Threshold for Random Forest is")
println(bMetrics_rf_w.precisionByThreshold.collect().foreach(print))
println("\nExecution Time for Training and Prediction of Random Forest is")
println(endTime_rf_w-startTime_rf_w + "ms")

Confusion Matrix of Random Forest is
6708.0  769.0  
11.0    327.0  

The Accuracy of Random Forest is
0.9001919385796545

The ROC for Random Forest is
(0.0,0.0)(0.10284873612411395,0.9674556213017751)(1.0,1.0)(1.0,1.0)()

The Area under ROC for Random Forest is
0.9323034425888306

The Precision of Random Forest is
0.9001919385796545

The Precision by Threshold for Random Forest is
(1.0,0.2983576642335766)(0.0,0.043250159948816376)()

Execution Time for Training and Prediction of Random Forest is
22098ms


### Analysis

Here also to deal with unbalanced data we are setting the thresholds while defining the model. Just as in decison tree Random Forest don't have weighted column approach at this point of time however this feature is in development phase now(Proof: https://issues.apache.org/jira/browse/SPARK-9478 ). Here we are getting a better 'precision by threshold' balance. The accuracy however has been reduced. This might be beacause of the fact that decision tree is over-fitting the data and when it comes to Random Forest because of bootstraping and other features are build-in, it is able to analyse the data in a better way. For this reason Random Forest is a more reliable model than decision tree. Another important thing to not here is that even after incorporating threshold into the model there is not much change in execution time.

Gradient-boosted tree classifier (With Defined Thresholds)
------------------------------------------------------

In [18]:
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
var startTime_gbt_w = System.currentTimeMillis()
val gbt_w = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").setMaxIter(10).setThresholds(Array(0.90,0.10))
val predictionDF_gbt_w = gbt_w.fit(train).transform(test)
var endTime_gbt_w = System.currentTimeMillis()

In [19]:
//Model Evaluation
val predictionAndLabels_MCM_gbt_w = predictionDF_gbt_w.select("prediction","label").as[(Double,Double)].rdd
val metrics_gbt_w = new MulticlassMetrics(predictionAndLabels_MCM_gbt_w)
val predictionLabelsRDD_gbt_w = predictionDF_gbt_w.select("prediction", "label").map(r => (r.getDouble(0), r.getDouble(1)))
val bMetrics_gbt_w = new BinaryClassificationMetrics(predictionLabelsRDD_gbt_w.rdd)
println("Confusion Matrix of Gradient-Boosted Tree is")
println(metrics_gbt_w.confusionMatrix)
println("\nThe Accuracy of Gradient-Boosted Tree is")
println(metrics_gbt_w.accuracy)
println("\nThe ROC for Gradient-Boosted Tree is")
println(bMetrics_gbt_w.roc.collect().foreach(print))
print("\n")
println("\nThe Area under ROC for Gradient-Boosted Tree is")
println(bMetrics_gbt_w.areaUnderROC)
println("\nThe Precision of Gradient-Boosted Tree is")
println(metrics_gbt_w.precision)
println("\nThe Precision by Threshold for Gradient-Boosted Tree is")
println(bMetrics_gbt_w.precisionByThreshold.collect().foreach(print))
println("\nExecution Time for Training and Prediction of Gradient-Boosted Tree is")
println(endTime_gbt_w-startTime_gbt_w + "ms")

Confusion Matrix of Gradient-Boosted Tree is
7430.0  47.0   
23.0    315.0  

The Accuracy of Gradient-Boosted Tree is
0.9910428662827895

The ROC for Gradient-Boosted Tree is
(0.0,0.0)(0.006285943560251438,0.9319526627218935)(1.0,1.0)(1.0,1.0)()

The Area under ROC for Gradient-Boosted Tree is
0.962833359580821

The Precision of Gradient-Boosted Tree is
0.9910428662827895

The Precision by Threshold for Gradient-Boosted Tree is
(1.0,0.8701657458563536)(0.0,0.043250159948816376)()

Execution Time for Training and Prediction of Gradient-Boosted Tree is
7213ms


### Analysis

Since Spark dataframe is an evolving technology especially for machine learning libraries, there is not much reference over the internet on in Spark official document on how to effectively handle imbalanced dataset. For this study we followed the same approach we used for decison tree and random forest as all of them are tree based model. However it didn't make much difference in the evaluation results, It does changed the 'Precision by threshold' results by a small margin but not much.The execution time for Gradient Booosted Tree without and with defined threshold were observed to be almost same. Gradient Boosted Trees are expected to give the best results but in this special case the though the results are good the question is how reliable they are? We feel it's not handling the imbalanced data effectively. 

## CASE STUDY 2 CONCLUSION

Our attempts to handle umbalanced data in spark models itself made us explore different routes. Weighted Column is the approach that spark has taken to handle this kind of scenarios. As of now its been implemented only for logistic regression. For tree based models this feature is in development phase and hence we have taking the approach of setting thresholds on the models explicitly. Though this is not a recommended approach in python or R, In spark this is the best possible solution available. We were a bit curious to see how RDD handled this scenario in spark and that led us to CASE STUDY 3.