In [1]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [3]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

    
val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



dataSchema = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment = [label: int, tweet: string]


[label: int, tweet: string]

In [4]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

val lr = new LogisticRegression()
    .setMaxIter(10)
    .setRegParam(0.001)

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))


tokenizer = tok_656b05f6de28
hashingTF = hashingTF_6d9f142ab066
lr = logreg_81695bffb5db
pipeline = pipeline_cf53acdf0b0a


pipeline_cf53acdf0b0a

In [18]:
val model = pipeline.fit(raw_sentiment)

model = pipeline_5bb7dd1227bd


pipeline_5bb7dd1227bd

In [5]:
model.write.overwrite().save("/home/jovyan/models/spark-ml-model")

Name: Compile Error
Message: <console>:33: error: not found: value model
       model.write.overwrite().save("/home/jovyan/models/spark-ml-model")
       ^

StackTrace: 

In [20]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model")

sameModel = pipeline_5bb7dd1227bd


pipeline_5bb7dd1227bd

In [21]:
val predictionsDF = sameModel.transform(raw_sentiment)

predictionsDF.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               tweet|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|@switchfoot http:...|[@switchfoot, htt...|(1000,[7,14,21,54...|[-0.9010125659401...|[0.28884245921786...|       1.0|
|    0|is upset that he ...|[is, upset, that,...|(1000,[170,193,22...|[1.84195706807746...|[0.86318000204742...|       0.0|
|    0|@Kenichan I dived...|[@kenichan, i, di...|(1000,[10,36,77,1...|[1.56488554961119...|[0.82705328017342...|       0.0|
|    0|my whole body fee...|[my, whole, body,...|(1000,[82,191,296...|[0.22286270195616...|[0.55548620895350...|       0.0|
|    0|@nationwideclass ...|[@nationwideclass...|(1000,[18,96,130,...|[3.23587893775226...|[0.96216236372478...|       0.0|
|    0|@

predictionsDF = [label: int, tweet: string ... 5 more fields]


[label: int, tweet: string ... 5 more fields]

In [24]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [27]:
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show

+--------------------+
|   clean_probability|
+--------------------+
|  0.7111575407821329|
| 0.13681999795257868|
|  0.1729467198265728|
|  0.4445137910464909|
|0.037837636275218155|
|  0.4175996770618622|
| 0.47539715389822584|
|  0.9129188858173812|
|  0.3039762575994967|
|  0.5326649357031117|
|  0.4390689703426534|
|  0.5978735997545834|
|  0.3332295562520133|
| 0.49135985941169885|
|0.048285817555274425|
| 0.21764493477475924|
|  0.2622598688640698|
|  0.5282380556537445|
|  0.7151514980011753|
|  0.7542266623347125|
+--------------------+
only showing top 20 rows

