-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
MachineLearning.scala
111 lines (93 loc) · 3.97 KB
/
MachineLearning.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.github.pedrovgs.sparkplayground.exercise11
import com.github.pedrovgs.sparkplayground.{Resources, SparkApp}
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
object MachineLearning extends SparkApp with Resources {
import sqlContext.implicits._
// Read tweets data from a local csv file
val tweetsData = sqlContext.read
.option("header", "true")
.option("inferSchema", "true")
.csv(getFilePath("/exercise11/costaRicaEarthquakeTweets.csv"))
.filter($"Informativeness".isNotNull)
.rdd
.map(row =>
({
if (row.getAs[String]("Informativeness").equals("Related and informative")) {
1.0
} else {
0.0
}
}, row.getAs[String]("TweetText").toLowerCase.split(" ")))
.toDF("label", "tweet")
.cache()
val word2VectorModel = new Word2Vec()
.setInputCol("tweet")
.setOutputCol("features")
.fit(tweetsData)
// Calculate features
val result = word2VectorModel
.transform(tweetsData)
// Transform as labeled points
val convertedResult = MLUtils.convertVectorColumnsFromML(result, "features")
val data: RDD[LabeledPoint] = convertedResult.rdd.map(row => {
val label = row.getAs[Double]("label")
val features = row.getAs[DenseVector]("features")
new LabeledPoint(label, features)
})
// Split data into training (60%) and test (40%).
val splits: Array[RDD[LabeledPoint]] = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training: RDD[LabeledPoint] = splits(0).cache()
val test: RDD[LabeledPoint] = splits(1)
// Run training algorithm to build the model
val numIterations: Int = 100
val svmModel = SVMWithSGD.train(training, numIterations)
// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
val score = svmModel.predict(point.features)
(score, point.label)
}
// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auPR = metrics.areaUnderPR()
val auROC = metrics.areaUnderROC()
pprint.pprintln("Time to save the models into disk and analize the area under PR and ROC:")
val wordToVectorModelOutputFile = "./outputs/exercise11/word2VectorModel"
delete(wordToVectorModelOutputFile)
word2VectorModel.save(wordToVectorModelOutputFile)
val svmModelOutputFile = "./outputs/exercise11/svmModel"
delete(svmModelOutputFile)
svmModel.save(sparkContext, svmModelOutputFile)
pprint.pprintln("Area under PR = " + auPR)
pprint.pprintln("Area under ROC = " + auROC)
pprint.pprintln("Let's classify some tweets!")
private val originalTweets: RDD[String] = sparkContext
.parallelize(
List(
"Socorro, un terremoto en San Francisco. Necesitamos ayuda!",
"Acabo de sentir un terremoto :S",
"Miley Cyrus mola mil",
"Vamos a la fiesta de cumpleaños de Pedro que llegamos tarde @davideme @delr3ves"
))
val tweetsToAnalyze = originalTweets
.map(_.split(" "))
.toDF("tweet")
val earthquakeTweetsResult = word2VectorModel.transform(tweetsToAnalyze)
val convertedEarthquakeTweetsResult =
MLUtils.convertVectorColumnsFromML(earthquakeTweetsResult, "features")
val earthquakeTweetsData: RDD[Vector] =
convertedEarthquakeTweetsResult.rdd.map(_.getAs[DenseVector]("features"))
val prediction = svmModel.predict(earthquakeTweetsData)
val predictionResult: RDD[(String, Double)] = originalTweets.zip(prediction)
pprint.pprintln(
"The following table shows the class a tweet has to be a real earthquake informative tweet. Class 1 -> earthquake related. Class 0 -> Non related:")
pprint.pprintln(
predictionResult
.map(tuple => "Tweet: " + tuple._1 + " - Class: " + tuple._2)
.collect())
}