In [None]:
机器学习算法之随机森林（Random Forest）

##### Bagging
Bagging方法是ensemble methods中获得用于训练base estimator的数据的重要一环。 正如其名，Bagging方法就是将所有training data放进一个黑色的bag中，黑色意味着我们看不到里面的数据的详细情况，只知道里面有我们的数据集。然后从这个bag中随机抽一部分数据出来用于训练一个base estimator。抽到的数据用完之后我们有两种选择，放回或不放回。

既然样本本身可以bagging，那么feature是不是也可以bagging呢？当然可以！bagging完数据本身之后我们可以再bagging features，即从所有特征维度里面随机选取部分特征用于训练。在后面我们会看到，这两个‘随机’就是随机森林的精髓所在。从随机性来看，bagging技术可以有效的减小方差，即减小过拟合程度。

在scikit-learn中，我们可以很方便的将bagging技术应用于一个分类器/回归器，提高性能：

##### Decision Tree
关于决策树，在这里不展开详细探讨，有机会的话另开一篇博客细说。先简单地举一个例子，以下是一棵分类树，决定下班后是否要观看机器学习公开课。

![Alt text](./images/DT-1.jpg)

###### 随机森林回归

算法介绍：

       随机森林是决策树的集成算法。随机森林包含多个决策树来降低过拟合的风险。随机森林同样具有易解释性、可处理类别特征、易扩展到多分类问题、不需特征缩放等性质。

       随机森林分别训练一系列的决策树，所以训练过程是并行的。因算法中加入随机过程，所以每个决策树又有少量区别。通过合并每个树的预测结果来减少预测的方差，提高在测试集上的性能表现。

       随机性体现：
1.每次迭代时，对原始数据进行二次抽样来获得不同的训练数据。

2.对于每个树节点，考虑不同的随机特征子集来进行分裂。

        除此之外，决策时的训练过程和单独决策树训练过程相同。

        对新实例进行预测时，随机森林需要整合其各个决策树的预测结果。回归和分类问题的整合的方式略有不同。分类问题采取投票制，每个决策树投票给一个类别，获得最多投票的类别为最终结果。回归问题每个树得到的预测结果为实数，最终的预测结果为各个树预测结果的平均值。

        spark.ml支持二分类、多分类以及回归的随机森林算法，适用于连续特征以及类别特征。

In [None]:
我们可以看到从根节点开始往下会有分支，最终会走向叶子节点，得到分类结果。每一个非叶子节点都是一个特征，上图中共有三维特征。
但是决策树的一个劣势就是容易过拟合，下面我们要结合上文提到的bagging技术来中和一下。

##### Random Forest
bagging + decision trees，我们得到了随机森林。将决策树作为base estimator，然后采用bagging技术训练一大堆小决策树，最后将这些小决策树组合起来，这样就得到了一片森林(随机森林)。

http://backnode.github.io/pages/2015/04/23/random-forest.html

In [None]:
召回率和正确率计算
对于一个K元的分类结果，我们可以得到一个K∗K的混淆矩阵,得到的举证结果如下图所示。 

从上图所示的结果中不同的元素表示的含义如下： 
mij ：表示实际分类属于类i，在预测过程中被预测到分类j
对于所有的mij可以概括为四种方式不同类型的数据:

TP（真正）:真正的分类结果属于i预测的结果也属于i，此时对于 mij 而言i=j
FN（假负）:真正的分类结果不属于分类i预测的分类结果也属于分类i
TN (真负) :真正的分类结果属于分类i预测的结果不属于分类i
FP (假正) :真正的分类结果不属于分类i预测的结果属于分类i 
那么召回率的计算公式如下： 

recall=TP/TP+TN

precision=TP/TP+FP

其中：
TPp=mpp:表示对于类p而言预测正确的个体的数目。

TNp=∑ki=1,i≠pmpi:表示对本来属于分类p，预测的结果不属于分类p的个数。

FPp=∑ki=1,i≠pmip:表示对于本来不属于分类p的个体，预测的结果属于分类p的个数。



##### 随机森林中召回率和正确率的计算

In [2]:
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
// 加载数据
val data = MLUtils.loadLibSVMFile(sc, "data/sample_libsvm_data.txt")
// 将数据随机分配为两份，一份用于训练，一份用于测试
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

In [3]:
// 随机森林训练参数设置
//分类数
val numClasses = 2
// categoricalFeaturesInfo 为空，意味着所有的特征为连续型变量
val categoricalFeaturesInfo = Map[Int, Int]()
//树的个数
val numTrees = 3 
//特征子集采样策略，auto 表示算法自主选取
val featureSubsetStrategy = "auto" 
//纯度计算
val impurity = "gini"
//树的最大层次
val maxDepth = 4
//特征最大装箱数
val maxBins = 32
//训练随机森林分类器，trainClassifier 返回的是 RandomForestModel 对象
val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
 numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)


In [4]:
// 测试数据评价训练好的分类器并计算错误率
val labelAndPreds = testData.map { point =>
 val prediction = model.predict(point.features)
 (point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification forest model:\n" + model.toDebugString)

// 将训练后的随机森林模型持久化
model.save(sc, "myModelPath")
//加载随机森林模型到内存
val sameModel = RandomForestModel.load(sc, "myModelPath")

Test Error = 0.07407407407407407
Learned classification forest model:
TreeEnsembleModel classifier with 3 trees

  Tree 0:
    If (feature 428 <= 0.0)
     If (feature 442 <= 0.0)
      If (feature 517 <= 0.0)
       Predict: 0.0
      Else (feature 517 > 0.0)
       Predict: 1.0
     Else (feature 442 > 0.0)
      Predict: 0.0
    Else (feature 428 > 0.0)
     Predict: 0.0
  Tree 1:
    If (feature 461 <= 0.0)
     If (feature 323 <= 100.0)
      Predict: 0.0
     Else (feature 323 > 100.0)
      Predict: 1.0
    Else (feature 461 > 0.0)
     Predict: 1.0
  Tree 2:
    If (feature 455 <= 23.0)
     If (feature 234 <= 0.0)
      Predict: 1.0
     Else (feature 234 > 0.0)
      If (feature 290 <= 36.0)
       Predict: 1.0
      Else (feature 290 > 36.0)
       Predict: 0.0
    Else (feature 455 > 23.0)
     Predict: 0.0



In [8]:
  /**
    * @param model 随机森林模型
    * @param data  用于交叉验证的数据集
    * */
  def getMetrics(model: RandomForestModel, data: RDD[LabeledPoint]): MulticlassMetrics = {
    //将交叉验证数据集的每个样本的特征向量交给模型预测,并和原本正确的目标特征组成一个tuple
    val predictionsAndLables = data.map { d =>
      (model.predict(d.features), d.label)
    }
    //将结果交给MulticlassMetrics,其可以以不同的方式计算分配器预测的质量
    new MulticlassMetrics(predictionsAndLables)
  }

In [9]:
 /**
    * 在训练数据集上得到最好的参数组合
    * @param trainData 训练数据集
    * @param cvData 交叉验证数据集
    * */
  def getBestParam(trainData: RDD[LabeledPoint], cvData: RDD[LabeledPoint]): Unit = {
    val evaluations = for (impurity <- Array("gini", "entropy");
                           depth <- Array(1, 20);
                           bins <- Array(10, 300)) yield {
      val model = RandomForest.trainClassifier(trainData, 2, Map[Int, Int](), 20, "auto", impurity, depth, bins)
      val metrics = getMetrics(model, cvData)
      ((impurity, depth, bins), metrics.precision)
    }
    evaluations.sortBy(_._2).reverse.foreach(println)
  }


In [3]:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.rdd.RDD

In [11]:
//读取数据
    //val rawData = sc.textFile("hdfs://192.168.1.64:8020/test/mllib/v3.csv")
    val rawData = MLUtils.loadLibSVMFile(sc, "data/sample_libsvm_data.txt")
    

In [12]:
val n=1
rawData.take(n).foreach(println)

(0.0,(692,[127,128,129,130,131,154,155,156,157,158,159,181,182,183,184,185,186,187,188,189,207,208,209,210,211,212,213,214,215,216,217,235,236,237,238,239,240,241,242,243,244,245,262,263,264,265,266,267,268,269,270,271,272,273,289,290,291,292,293,294,295,296,297,300,301,302,316,317,318,319,320,321,328,329,330,343,344,345,346,347,348,349,356,357,358,371,372,373,374,384,385,386,399,400,401,412,413,414,426,427,428,429,440,441,442,454,455,456,457,466,467,468,469,470,482,483,484,493,494,495,496,497,510,511,512,520,521,522,523,538,539,540,547,548,549,550,566,567,568,569,570,571,572,573,574,575,576,577,578,594,595,596,597,598,599,600,601,602,603,604,622,623,624,625,626,627,628,629,630,651,652,653,654,655,656,657],[51.0,159.0,253.0,159.0,50.0,48.0,238.0,252.0,252.0,252.0,237.0,54.0,227.0,253.0,252.0,239.0,233.0,252.0,57.0,6.0,10.0,60.0,224.0,252.0,253.0,252.0,202.0,84.0,252.0,253.0,122.0,163.0,252.0,252.0,252.0,253.0,252.0,252.0,96.0,189.0,253.0,167.0,51.0,238.0,253.0,253.0,190.0,114.0,253.0,2

In [13]:
   //训练集、交叉验证集和测试集，各占80%，10%和10%
    //10%的交叉验证数据集的作用是确定在训练数据集上训练出来的模型的最好参数
    //测试数据集的作用是评估CV数据集的最好参数
    val Array(trainData, cvData, testData) = data.randomSplit(Array(0.8, 0.1, 0.1))
    trainData.cache()
    cvData.cache()
    testData.cache()

MapPartitionsRDD[268] at randomSplit at <console>:143

In [1]:
 //构建随机森林
    val model = RandomForest.trainClassifier(trainData, 2, Map[Int, Int](), 20, "auto", "gini", 4, 32)
    val metrics = getMetrics(model, cvData)
    println("-----------------------------------------confusionMatrix-----------------------------------------------------")
    //混淆矩阵和模型精确率
    println(metrics.confusionMatrix)
    println("---------------------------------------------precision-------------------------------------------------")
    println(metrics.precision)

    println("-----------------------------------------(precision,recall)---------------------------------------------------")
    //每个类别对应的精确率与召回率
    (0 until 2).map(target => (metrics.precision(target), metrics.recall(target))).foreach(println)
    
    
    //保存模型
    //model.save(sc,"hdfs://192.168.1.64:8020/tmp/RFModel")

    // 将训练后的随机森林模型持久化
    model.save(sc, "RandomForestModel-Save")


-----------------------------------------confusionMatrix-----------------------------------------------------
5.0  0.0  
0.0  3.0  
---------------------------------------------precision-------------------------------------------------
1.0
-----------------------------------------(precision,recall)---------------------------------------------------
(1.0,1.0)
(1.0,1.0)


In [6]:
/**
    * 模拟对新数据进行预测1
    */
   val rawData = MLUtils.loadLibSVMFile(sc, "data/sample_libsvm_data.txt")


  //读取模型
  val rfModel = RandomForestModel.load(sc,"RandomForestModel-Save")
  //进行预测
  val preLabel = rfModel.predict(rawData)
  preLabel.take(10)

Name: Unknown Error
Message: <console>:140: error: value split is not a member of org.apache.spark.mllib.regression.LabeledPoint
           val values = line.split(",").map(_.toDouble)
                             ^
StackTrace: 