In [1]:
import org.apache.spark.ml.Pipeline

In [2]:


import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF}



In [4]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.
      master("local[*]").
      appName("spark session example").
      getOrCreate()


In [5]:
import spark.implicits._
case class DataRecord(id:String,label: Double, text: String)

In [6]:
//原始資料讀入，轉成DataFrame
val data = spark.
    sparkContext.
    textFile("labeledTrainData25000.tsv").map({
    x =>
    var line = x.split("\t")
    DataRecord(line(0),line(1).toDouble,line(2))//
}).toDF()

In [7]:
data.show(1)

+--------+-----+--------------------+
|      id|label|                text|
+--------+-----+--------------------+
|"5814_8"|  1.0|"With all this st...|
+--------+-----+--------------------+
only showing top 1 row



In [8]:
//斷詞
import org.apache.spark.ml.feature.RegexTokenizer
val regexTokenizer = new RegexTokenizer().
  setInputCol("text").
  setOutputCol("word").
  setPattern("(\\W|\\d)")

In [9]:
//去除停用詞
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover().
  setInputCol("word").
  setOutputCol("stopWord")

In [10]:


//CountVectorizer詞頻  若沒有用到 可以省略
val tf = new CountVectorizer().
      setInputCol("stopWord").
      setOutputCol("cv_vector").
      setVocabSize(9).  // 最多幾個字詞? 取最高頻的
      setMinDF(2) //統計字詞大於等於2次



In [11]:


//HashingTF詞頻
val hashingTF = new HashingTF().
    setInputCol("stopWord").
    setOutputCol("htf_vector").
    setNumFeatures(8)



In [12]:


val idf = new IDF().
    setInputCol(hashingTF.getOutputCol).
    setOutputCol("tfidf_vector")



In [14]:


import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator



//搭配一個你想要的分類器MPL
//注意這裡選擇的是tfidf_vector
//輸入向量模型有三種可以選擇--count vector, hashingTF, htfidf 哪一種最好?要依結果的準確度去選擇!

val layers = Array[Int](8, 15, 14, 2)
val trainer = new MultilayerPerceptronClassifier().
  setLayers(layers).
  setLabelCol("label").
  setFeaturesCol("tfidf_vector").
  setPredictionCol("prediction").
  setBlockSize(128).
  setSeed(1234L).
  setMaxIter(500)



In [16]:


//用一個管子串接起來
val pipeline = new Pipeline().
      setStages(Array(regexTokenizer, remover, tf, hashingTF, idf, trainer))



In [17]:


//將資料切成training, test 這裡因為資料太少，假設兩個資料集跟原始的資料集一樣!

val Array(training, test) = data.randomSplit(Array(0.7, 0.3))

In [18]:


//把training資料集 丟入管線 
val mlp_model = pipeline.fit( training )



In [19]:


//預測測試資料集的答案
val testResult = mlp_model.transform(  test )



In [20]:
testResult.show()

+---------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|       id|label|                text|                word|            stopWord|           cv_vector|          htf_vector|        tfidf_vector|prediction|
+---------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    "0_3"|  0.0|"Story of a man w...|[story, of, a, ma...|[story, man, unna...|(9,[5,6,7,8],[1.0...|(8,[0,1,2,3,4,5,6...|(8,[0,1,2,3,4,5,6...|       1.0|
|"10000_8"|  1.0|"Homelessness (or...|[homelessness, or...|[homelessness, ho...|(9,[0,2,3,4],[8.0...|(8,[0,1,2,3,4,5,6...|(8,[0,1,2,3,4,5,6...|       0.0|
|"10002_7"|  1.0|"This is easily t...|[this, is, easily...|[easily, underrat...|       (9,[2],[2.0])|(8,[0,1,2,3,4,5,6...|(8,[0,1,2,3,4,5,6...|       0.0|
|"10004_3"|  0.0|"\"It appears tha...|[it, appears, tha...|[appears, m

In [21]:


//計算準確度
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val predictionAndLabels = testResult.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy")
println("Accuracy:"+evaluator.evaluate(predictionAndLabels))



Accuracy:0.5470912266559184
