In [1]:
// Imports and starting the spark session.
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
19,application_1574540295268_0009,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

In [2]:
//Importing the train and test datasets from the csv files stored on 'azure storage' in the created cluster.(0.1m dataset)

def csv_to_sparse_labpoint(fname:String) : org.apache.spark.rdd.RDD[LabeledPoint] = {
  val rdd = sc.textFile(fname).map({ line =>
    val vv = line.split(',').map(_.toDouble)
    val label = vv(0) 
    val X = vv.slice(1,vv.size)
    val n = X.filter(_!=0).length
    var X_ids = Array.fill(n){0}
    var X_vals = Array.fill(n){0.0}
    var kk = 0
    for( k <- 0 to X.length-1) {
      if (X(k)!=0) {
        X_ids(kk) = k
        X_vals(kk) = X(k)
        kk = kk + 1
      }
    }
    val features = Vectors.sparse(X.length, X_ids, X_vals) 
    LabeledPoint(label, features)
  })
  return rdd
}

val d_train_0 = csv_to_sparse_labpoint("wasb://bdclusterstore@bdclusterhdistorage1.blob.core.windows.net/csvFiles/spark-train-0.1m.csv")
val d_test = csv_to_sparse_labpoint("wasb://bdclusterstore@bdclusterhdistorage1.blob.core.windows.net/csvFiles/spark-test-0.1m.csv")


d_test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[5] at map at <console>:30

In [3]:
// partition and caching the test and train dataset.
d_train_0.partitions.size
val d_train = d_train_0.repartition(32)
d_train.partitions.size

d_train.cache()
d_test.cache()

d_test.count()
d_train.count()

res9: Long = 100000

In [4]:
/* Model Training and Calculating the time taken for model training. 
# Parameters defined are:
##  number_of_trees       = 100
##  max_bins              = 20
##  max_depth             = 50
##  impurity              = gini 
##  featureSubsetStrategy = "sqrt" */

val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 100
val featureSubsetStrategy = "sqrt" 
val impurity = "gini"
val maxDepth = 20     
val maxBins = 50

val now = System.nanoTime
val model = RandomForest.trainClassifier(d_train, numClasses, categoricalFeaturesInfo,
  numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
val total_time = ( System.nanoTime - now )/1e9
println("Time taken is: "+total_time)

Time taken is: 56.586502137

In [5]:
// Prediction and AUC calculation for the model.

import org.apache.spark.mllib.tree.configuration.FeatureType.Continuous
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node}
def softPredict(node: Node, features: Vector): Double = {
  if (node.isLeaf) {
    if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob
  } else {
    if (node.split.get.featureType == Continuous) {
      if (features(node.split.get.feature) <= node.split.get.threshold) {
        softPredict(node.leftNode.get, features)
      } else {
        softPredict(node.rightNode.get, features)
      }
    } else {
      if (node.split.get.categories.contains(features(node.split.get.feature))) {
        softPredict(node.leftNode.get, features)
      } else {
        softPredict(node.rightNode.get, features)
      }
    }
  }
}
def softPredict2(dt: DecisionTreeModel, features: Vector): Double = {
  softPredict(dt.topNode, features)
}


val scoreAndLabels = d_test.map { point =>
  val score = model.trees.map(tree => softPredict2(tree, point.features)).sum / model.numTrees
  (score, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
metrics.areaUnderROC()

res14: Double = 0.7038522198280078

In [6]:
// mapping the predict for other metric calculations.
val predictionAndLabels = d_test.map { case LabeledPoint(label, features) =>
     val prediction = model.predict(features)
    (prediction, label)
}

predictionAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[94] at map at <console>:33

In [7]:
val accuracy = predictionAndLabels.filter(x => x._1 == x._2).count().toDouble / predictionAndLabels.count().toDouble
println("Accuracy = " + accuracy)

Accuracy = 0.78515

In [8]:
val testErr = predictionAndLabels.filter(r => r._1 != r._2).count.toDouble / d_test.count()
println("Test Error = " + testErr)

Test Error = 0.21485

In [9]:
val testMSE = predictionAndLabels.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println("Test Mean Squared Error = " + testMSE)

Test Mean Squared Error = 0.21484999999999888