In [21]:
println(s"Current spark version is ${spark.version}")

Current spark version is 3.0.0


In [22]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

val dataPath= "./home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}
dataSchema: org.apache.spark.sql.types.StructType = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath: String = ./home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment: org.apache.spark.sql.DataFrame = [label: int, tweet: string]


In [20]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression


import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row


val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

val lr = new LogisticRegression()
    .setMaxIter(10)
    .setRegParam(0.001)


val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))


import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_91d03ec80849
hashingTF: org.apache.spark.ml.feature.HashingTF = HashingTF: uid=hashingTF_f2da4c434deb, binary=false, numFeatures=1000
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_abfff0af24ac
pipeline: org.apache.spark.ml.Pipeline = pipeline_c1060c847dd2


In [23]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}


val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, rf))

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_4ec0fe5aa0da
hashingTF: org.apache.spark.ml.feature.HashingTF = HashingTF: uid=hashingTF_f67c2e516f83, binary=false, numFeatures=1000
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_5a56f6eb6362
pipeline: org.apache.spark.ml.Pipeline = pipeline_fbfca867f926


In [24]:
val model = pipeline.fit(raw_sentiment)

java.lang.IllegalArgumentException:  indexedFeatures does not exist. Available: label, tweet, words, features

In [25]:
model.write.overwrite().save("./home/jovyan/models/spark-ml-model")

In [26]:
val sameModel = PipelineModel.load("./home/jovyan/models/spark-ml-model")

sameModel: org.apache.spark.ml.PipelineModel = pipeline_6134807c4a40


In [27]:
val predictionsDF = sameModel.transform(raw_sentiment)

predictionsDF.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               tweet|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|@switchfoot http:...|[@switchfoot, htt...|(1000,[10,21,81,1...|[-1.7093077824653...|[0.15325352111538...|       1.0|
|    0|is upset that he ...|[is, upset, that,...|(1000,[121,193,20...|[0.98452561467195...|[0.72800527368281...|       0.0|
|    0|@Kenichan I dived...|[@kenichan, i, di...|(1000,[17,185,188...|[-0.2839685001973...|[0.42948111509142...|       1.0|
|    0|my whole body fee...|[my, whole, body,...|(1000,[191,330,44...|[0.15558576510068...|[0.53881816712715...|       0.0|
|    0|@nationwideclass ...|[@nationwideclass...|(1000,[32,162,166...|[2.91879438070485...|[0.94876772860820...|       0.0|
|    0|@

predictionsDF: org.apache.spark.sql.DataFrame = [label: int, tweet: string ... 5 more fields]


In [28]:
// predictionsDF.schema
// predictionsDF.describe()
// predictionsDF.stat
predictionsDF.printSchema()

root
 |-- label: integer (nullable = false)
 |-- tweet: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [29]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

import org.apache.spark.sql.functions._
getProbability: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4582/0x0000000841566840@1caba1d6,DoubleType,List(Some(class[value[0]: vector])),None,false,true)


In [30]:
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show

+--------------------+
|   clean_probability|
+--------------------+
|  0.8467464788846125|
| 0.27199472631718885|
|  0.5705188849085758|
| 0.46118183287284903|
| 0.05123227139179534|
|  0.3665977694385508|
| 0.40955379937871456|
|  0.9015464258815026|
|  0.3999186720153602|
|  0.4152371193384253|
|  0.6408024160475245|
|  0.5167122483759258|
|  0.5331218062410026|
|  0.3555001641861205|
|0.053603436925034276|
| 0.10411602709439732|
|  0.3476012052730357|
|  0.4233147373497812|
|   0.620567592228023|
|  0.6259620014111376|
+--------------------+
only showing top 20 rows

