In [1]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row


Creating SparkContext as 'sc'
Creating SqlContext as 'sqlContext'
Creating HiveContext as 'hiveContext'
import org.apache.spark.sql.Row

In [3]:
case class Tweets(id: Int, label: Double, source: String, text: String)
val training = sc.textFile("wasb:///training-tweets.csv").zipWithIndex().filter(_._2 > 0).map(line => line._1.split(",")).map(tw => Tweets(tw(0).toInt, tw(1).toDouble, tw(2), tw(3))).toDF() 

val test = sc.textFile("wasb:///test-tweets.csv").zipWithIndex().filter(_._2 > 0).map(line => line._1.split(",")).map(tw => Tweets(tw(0).toInt, tw(1).toDouble, tw(2), tw(3))).toDF()


test: org.apache.spark.sql.DataFrame = [id: int, label: double, source: string, text: string]

In [4]:
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol("features")
val lr = new LogisticRegression().setMaxIter(10)
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))


pipeline: org.apache.spark.ml.Pipeline = pipeline_b1448efec087

In [5]:
val paramGrid = new ParamGridBuilder()
paramGrid.addGrid(hashingTF.numFeatures, Array(10, 20, 100))
paramGrid.addGrid(lr.regParam, Array(0.1, 0.01))
val grid = paramGrid.build()


grid: Array[org.apache.spark.ml.param.ParamMap] = 
Array({
	hashingTF_6098835c3dcb-numFeatures: 10,
	logreg_d34c68eb22c3-regParam: 0.1
}, {
	hashingTF_6098835c3dcb-numFeatures: 10,
	logreg_d34c68eb22c3-regParam: 0.01
}, {
	hashingTF_6098835c3dcb-numFeatures: 20,
	logreg_d34c68eb22c3-regParam: 0.1
}, {
	hashingTF_6098835c3dcb-numFeatures: 20,
	logreg_d34c68eb22c3-regParam: 0.01
}, {
	hashingTF_6098835c3dcb-numFeatures: 100,
	logreg_d34c68eb22c3-regParam: 0.1
}, {
	hashingTF_6098835c3dcb-numFeatures: 100,
	logreg_d34c68eb22c3-regParam: 0.01
})

In [6]:
val crossvalidator = new CrossValidator()
crossvalidator.setEstimator(pipeline)
crossvalidator.setEvaluator(new BinaryClassificationEvaluator)
crossvalidator.setEstimatorParamMaps(grid)
crossvalidator.setNumFolds(6) 


res6: crossvalidator.type = cv_5ab3eec6a398

In [7]:
val model = crossvalidator.fit(training)

model: org.apache.spark.ml.tuning.CrossValidatorModel = cv_5ab3eec6a398

In [8]:
val modelem = model.transform(test).select("id", "label", "text", "probability", "prediction")
modelem.collect().foreach { case Row(id: Int, label : Double, text: String, probability: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$probability, prediction=$prediction")
  }
(modelem.filter("label = prediction").count().toDouble / modelem.count().toDouble) * 100D


res8: Double = 57.01943844492441

In [9]:
model.avgMetrics

res9: Array[Double] = Array(0.595102078409595, 0.5960110108059493, 0.6160856959751109, 0.618246455638469, 0.6709262902666684, 0.673400823759253)

In [10]:
import org.apache.spark.ml.tuning.TrainValidationSplit

import org.apache.spark.ml.tuning.TrainValidationSplit

In [11]:
case class Tweets(id: Int, label: Double, source: String, text: String)
val tweets = sc.textFile("wasb:///training-tweets.csv").zipWithIndex().filter(_._2 > 0).map(line => line._1.split(",")).map(tw => Tweets(tw(0).toInt, tw(1).toDouble, tw(2), tw(3))).toDF()
val Array(training, test) = tweets.randomSplit(Array(0.9, 0.1), seed = 11L)


training: org.apache.spark.sql.DataFrame = [id: int, label: double, source: string, text: string]
test: org.apache.spark.sql.DataFrame = [id: int, label: double, source: string, text: string]

In [12]:
val trainValidationSplit = new TrainValidationSplit()
trainValidationSplit.setEstimator(pipeline)
trainValidationSplit.setEvaluator(new BinaryClassificationEvaluator)
trainValidationSplit.setEstimatorParamMaps(grid)
trainValidationSplit.setTrainRatio(0.8)


res13: trainValidationSplit.type = tvs_78b1c8a103d7

In [13]:
val model = trainValidationSplit.fit(training)

model: org.apache.spark.ml.tuning.TrainValidationSplitModel = tvs_78b1c8a103d7

In [14]:
model.transform(test).select("features", "label", "prediction").show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(100,[0,4,5,13,40...|  1.0|       0.0|
|(100,[0,1,7,11,39...|  1.0|       0.0|
|(100,[0,33,97],[4...|  0.0|       0.0|
|(100,[0,1,5,12,16...|  0.0|       0.0|
|(100,[0,1,5,6,14,...|  1.0|       0.0|
|(100,[0,4,5,50],[...|  0.0|       0.0|
|(100,[0,5,7,13,28...|  0.0|       0.0|
|(100,[0,7,9,23,52...|  1.0|       0.0|
|(100,[0,1,6,11,12...|  1.0|       0.0|
|(100,[0,17,45,56,...|  0.0|       0.0|
|(100,[0,9,16,18,2...|  0.0|       0.0|
|(100,[0,1,14,25,2...|  0.0|       0.0|
|(100,[0,4,6,14,47...|  0.0|       0.0|
|(100,[0,23,39,43]...|  0.0|       0.0|
|(100,[0,1,30,39,6...|  0.0|       1.0|
|(100,[0,10,16,27,...|  1.0|       1.0|
|(100,[34,43,50,66...|  0.0|       0.0|
|(100,[0,5,8,17,26...|  1.0|       0.0|
|(100,[0,1,6,7,9,1...|  1.0|       1.0|
|(100,[0,8,11,22,2...|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 20 rows

In [15]:
model.validationMetrics

res15: Array[Double] = Array(0.610366086135461, 0.6117928406115689, 0.6299418873072471, 0.6308518528447218, 0.6853254281008085, 0.6863389003565411)