In [1]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [2]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

    
val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



dataSchema = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment = [label: int, tweet: string]


[label: int, tweet: string]

In [3]:
    val seed = 12345
    val splits = raw_sentiment.randomSplit(Array(0.60, 0.40), seed)
    val (trainingData, testData) = (splits(0), splits(1))

    trainingData.cache
    testData.cache    

seed = 12345
splits = Array([label: int, tweet: string], [label: int, tweet: string])
trainingData = [label: int, tweet: string]
testData = [label: int, tweet: string]


[label: int, tweet: string]

In [9]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning.{ ParamGridBuilder, CrossValidator }
import org.apache.spark.ml.regression.{RandomForestRegressor, RandomForestRegressionModel}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder, CrossValidatorModel, TrainValidationSplit}

val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")


val rf = new RandomForestClassifier()
    .setLabelCol("label")
    .setFeaturesCol("features")
    .setNumTrees(50)
    
   

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, rf))


tokenizer = tok_1b30f7229375


hashingTF: org.apache.spark.ml.feature.HashingT...


tok_1b30f7229375

In [5]:
val paramGrid = new ParamGridBuilder()
      .addGrid(rf.maxDepth, 5 :: 10 :: 15 :: Nil)
      .addGrid(rf.featureSubsetStrategy, "auto" :: "all" :: Nil)
      .addGrid(rf.impurity, "gini" :: "entropy":: Nil)
      .addGrid(rf.maxBins, 2 :: 5 :: 10 :: Nil)
      .addGrid(rf.numTrees, 10 :: 50 :: Nil)
      .build()

paramGrid = 


Array({
	rfc_32cbc25853a8-featureSubsetStrategy: auto,
	rfc_32cbc25853a8-impurity: gini,
	rfc_32cbc25853a8-maxBins: 2,
	rfc_32cbc25853a8-maxDepth: 5,
	rfc_32cbc25853a8-numTrees: 10
}, {
	rfc_32cbc25853a8-featureSubsetStrategy: auto,
	rfc_32cbc25853a8-impurity: gini,
	rfc_32cbc25853a8-maxBins: 2,
	rfc_32cbc25853a8-maxDepth: 10,
	rfc_32cbc25853a8-numTrees: 10
}, {
	rfc_32cbc25853a8-featureSubsetStrategy: auto,
	rfc_32cbc25853a8-impurity: gini,
	rfc_32cbc25853a8-maxBins: 2,
	rfc_32cbc25853a8-maxDepth: 15,
	rfc_32cbc25853a8-numTrees: 10
}, {
	rfc_32cbc25853a8-featureSubsetStrategy: auto,
	rfc_32cbc25853a8-impurity: entropy,
	rfc_32cbc25853a8-maxBins: 2,
	rfc_32cbc25853a8-maxDepth: 5,
	rfc_32cbc25853a8-numTrees: 10
}, {
	rfc_32cbc25853a8...


In [10]:
val model = pipeline.fit(raw_sentiment).asInstanceOf[PipelineModel]

model = pipeline_71e179e87a45


pipeline_71e179e87a45

In [11]:
model.write.overwrite().save("/home/jovyan/models/spark-ml-model")

In [12]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model")

sameModel = pipeline_71e179e87a45


pipeline_71e179e87a45

In [13]:
val predictionsDF = sameModel.transform(raw_sentiment)

predictionsDF.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               tweet|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|@switchfoot http:...|[@switchfoot, htt...|(1000,[7,14,21,54...|[23.4074465558413...|[0.46814893111682...|       1.0|
|    0|is upset that he ...|[is, upset, that,...|(1000,[170,193,22...|[25.4201345206550...|[0.50840269041310...|       0.0|
|    0|@Kenichan I dived...|[@kenichan, i, di...|(1000,[10,36,77,1...|[27.4328870945118...|[0.54865774189023...|       0.0|
|    0|my whole body fee...|[my, whole, body,...|(1000,[82,191,296...|[25.5825291770754...|[0.51165058354150...|       0.0|
|    0|@nationwideclass ...|[@nationwideclass...|(1000,[18,96,130,...|[27.3528875090967...|[0.54705775018193...|       0.0|
|    0|@

predictionsDF = [label: int, tweet: string ... 5 more fields]


[label: int, tweet: string ... 5 more fields]

In [14]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [15]:
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show

+-------------------+
|  clean_probability|
+-------------------+
| 0.5318510688831722|
| 0.4915973095868992|
| 0.4513422581097638|
|0.48834941645849084|
|  0.452942249818065|
|  0.493782866896605|
| 0.5167079529317546|
| 0.5369361259629176|
| 0.5080084919681612|
|  0.517109733770774|
| 0.5184442891959721|
| 0.4750218302181278|
|0.46671481296273626|
| 0.5127931065947351|
|0.45086031680827354|
| 0.4571477827786316|
|0.48458054869235645|
|  0.514336505851116|
| 0.5516728810021843|
| 0.5423367275955744|
+-------------------+
only showing top 20 rows

