In [1]:
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression._

val rawData = sc.textFile("file:///var/lib/myspark/dataset/covtype.data")

val data = rawData.map{ line =>
    val values = line.split(',').map(_.toDouble)
    val featureVector = Vectors.dense(values.init)
    val label = values.last - 1
    LabeledPoint(label, featureVector)
}
data.count()

581012

In [2]:
val Array(trainData, cvData, testData)= data.randomSplit(Array(0.8,0.1,0.1))
trainData.cache()
cvData.cache()
testData.cache()

MapPartitionsRDD[5] at randomSplit at <console>:37

In [3]:
import org.apache.spark.mllib.evaluation._
import org.apache.spark.mllib.tree._
import org.apache.spark.mllib.tree.model._
import org.apache.spark.rdd._

def getMetrics(model: DecisionTreeModel, data: RDD[LabeledPoint]):
    MulticlassMetrics = {
    val predictionAndLabels = data.map(example =>
        (model.predict(example.features), example.label)
    )
    new MulticlassMetrics(predictionAndLabels)
}

val model = DecisionTree.trainClassifier(trainData, 7, Map[Int,Int](), "gini", 4, 100)

val metrics = getMetrics(model, cvData)
metrics.confusionMatrix

14098.0  6829.0   1.0     0.0  0.0  0.0  348.0
5382.0   22718.0  474.0   0.0  4.0  0.0  28.0
0.0      710.0    2826.0  0.0  0.0  0.0  0.0
0.0      0.0      273.0   0.0  0.0  0.0  0.0
0.0      890.0    25.0    0.0  8.0  0.0  0.0
0.0      413.0    1312.0  0.0  0.0  0.0  0.0
1153.0   45.0     0.0     0.0  0.0  0.0  822.0

In [4]:
println(metrics.precision)

0.6935005740331397


In [5]:
(0 until 7).map(x =>
(metrics.precision(x),metrics.recall(x))).foreach(println)

(0.6832743663064024,0.6626245534874976)
(0.7188103148236039,0.794169055442914)
(0.5754428833231521,0.7992081447963801)
(0.0,0.0)
(0.6666666666666666,0.00866738894907909)
(0.0,0.0)
(0.6861435726210351,0.4069306930693069)


In [6]:
import org.apache.spark.rdd._

def classProbabilities(data: RDD[LabeledPoint]): Array[Double]={
    val countByCategory = data.map(_.label).countByValue()
    val counts = countByCategory.toArray.sortBy(_._1).map(_._2)
    counts.map(_.toDouble/counts.sum)
}

val trainPriorProbabilities = classProbabilities(trainData)
val cvPriorProbabilities = classProbabilities(cvData)
trainPriorProbabilities.zip(cvPriorProbabilities).map{
    case(trainProb, cvProb) => trainProb * cvProb
}.sum

0.37800805718266617

In [7]:
val evaluations = 
    for (impurity <- Array("gini","entropy");
         depth    <- Array(1,20);
         bins     <- Array(10,300))
    yield {
        val model = DecisionTree.trainClassifier(
            trainData, 7, Map[Int,Int](), impurity, depth, bins)
        val predicitionsAndLabels = cvData.map(x=>
            (model.predict(x.features), x.label))
        val accuracy = new MulticlassMetrics(predicitionsAndLabels).precision
        ((impurity, depth, bins), accuracy)
}

evaluations.sortBy(_._2).reverse.foreach(println)

((entropy,20,300),0.9135009167394917)
((gini,20,300),0.9025857194263096)
((entropy,20,10),0.8965712229476174)
((gini,20,10),0.8935211364142635)
((gini,1,10),0.6379821449990576)
((gini,1,300),0.6362343426035402)
((entropy,1,300),0.4901728953546154)
((entropy,1,10),0.4901728953546154)


In [8]:
val newModel = DecisionTree.trainClassifier(trainData.union(cvData), 7, Map[Int,Int](), "entropy", 20, 300)
val predicitionsAndLabelsNew = testData.map(x=>
                        (newModel.predict(x.features), x.label))

In [9]:
val F1= new MulticlassMetrics(predicitionsAndLabelsNew).fMeasure
println(F1)

0.9149212169431136


In [10]:
val precision= new MulticlassMetrics(predicitionsAndLabelsNew).precision
println(precision)

0.9149212169431136


In [11]:
val recall= new MulticlassMetrics(predicitionsAndLabelsNew).recall
println(recall)

0.9149212169431136


In [12]:
println(newModel.toDebugString)

DecisionTreeModel classifier of depth 20 with 30769 nodes
  If (feature 0 <= 2694.0)
   If (feature 0 <= 2488.0)
    If (feature 0 <= 2378.0)
     If (feature 3 <= 0.0)
      If (feature 5 <= 644.0)
       If (feature 12 <= 0.0)
        If (feature 9 <= 959.0)
         If (feature 5 <= 90.0)
          If (feature 1 <= 21.0)
           If (feature 1 <= 17.0)
            Predict: 2.0
           Else (feature 1 > 17.0)
            If (feature 0 <= 2034.0)
             Predict: 5.0
            Else (feature 0 > 2034.0)
             Predict: 3.0
          Else (feature 1 > 21.0)
           Predict: 2.0
         Else (feature 5 > 90.0)
          If (feature 6 <= 202.0)
           If (feature 14 <= 0.0)
            If (feature 23 <= 0.0)
             If (feature 18 <= 0.0)
              If (feature 7 <= 233.0)
               If (feature 2 <= 22.0)
                If (feature 0 <= 2324.0)
                 If (feature 5 <= 618.0)
                  If (feature 2 <= 17.0)
                   If (f

In [13]:
val newData = rawData.map{line=>
    val values = line.split(",").map(_.toDouble)
    val wilderness = values.slice(10,14).indexOf(1.0).toDouble
    val soil = values.slice(14,54).indexOf(1.0).toDouble
    val featureVector = Vectors.dense(values.slice(0,10):+wilderness:+soil)
    val label = values.last-1
    LabeledPoint(label, featureVector)
}
val Array(trainDataNew, cvDataNew, testDataNew)= newData.randomSplit(Array(0.8,0.1,0.1))
trainDataNew.cache()
cvDataNew.cache()
testDataNew.cache()
trainDataNew.count
cvDataNew.count
testDataNew.count

58505

In [14]:
val evaluationsNew = 
    for (impurity <- Array("gini","entropy");
         depth    <- Array(10,20,30);
         bins     <- Array(40,300))
    yield {
        val model = DecisionTree.trainClassifier(
            trainDataNew, 7, Map[Int,Int](10->4, 11->40), impurity, depth, bins)
        /*val predicitionsAndLabels = cvDataNew.map(x=>
            (model.predict(x.features), x.label))
        val accuracy = new MulticlassMetrics(predicitionsAndLabels).precision
        ((impurity, depth, bins), accuracy)*/
        val trainAccuracy = getMetrics(model, trainDataNew).fMeasure
        val cvAccuracy = getMetrics(model, cvDataNew).fMeasure
        ((impurity, depth, bins),(trainAccuracy, cvAccuracy))
}

evaluationsNew.sortBy(_._2._1).reverse.foreach(println)

((gini,30,300),(0.9997825478502379,0.936954012095732))
((entropy,30,300),(0.999694275195384,0.9425021968744077))
((gini,30,40),(0.9996705922879842,0.9346106793941795))
((entropy,30,40),(0.9996382974142571,0.9402794768854351))
((gini,20,300),(0.9732383146381898,0.9260471768010062))
((gini,20,40),(0.9700927939371757,0.9253579613005497))
((entropy,20,300),(0.9696751135703059,0.9276668332270793))
((entropy,20,40),(0.9695071802269253,0.9274428381894309))
((gini,10,300),(0.7961784399422999,0.7958371383772421))
((gini,10,40),(0.7902943139492324,0.7904957182487035))
((entropy,10,40),(0.7834111998622085,0.781639299067836))
((entropy,10,300),(0.780282903093849,0.7778141530403019))


In [15]:
val forest = RandomForest.trainClassifier(trainDataNew, 7,  Map(10->4, 11->40), 10, "auto", "entropy", 30, 300)

In [17]:
import org.apache.spark.mllib.evaluation._
import org.apache.spark.mllib.tree._
import org.apache.spark.mllib.tree.model._
import org.apache.spark.rdd._

def getMetricsForest(model: RandomForestModel, data: RDD[LabeledPoint]):
    MulticlassMetrics = {
    val predictionAndLabels = data.map(example =>
        (model.predict(example.features), example.label)
    )
    new MulticlassMetrics(predictionAndLabels)
}

In [19]:
val trainAccuracy4forest = getMetricsForest(forest, trainDataNew).fMeasure
val cvAccuracy4forest = getMetricsForest(forest, cvDataNew).fMeasure
println("trainAccuracy4forest = " + trainAccuracy4forest + ", cvAccuracy4forest = " + cvAccuracy4forest)

trainAccuracy4forest = 0.9978663853424333, cvAccuracy4forest = 0.9583196926098868


In [20]:
val input = "2709,125,28,67,23,3224,253,207,61,6094,0,29"
val vector = Vectors.dense(input.split(',').map(_.toDouble))
forest.predict(vector)

4.0