In [1]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [2]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

    
val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



dataSchema = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment = [label: int, tweet: string]


[label: int, tweet: string]

In [9]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(raw_sentiment)

val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

val featureIndexer = new VectorIndexer()
    .setInputCol(hashingTF.getOutputCol)
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)

val rf = new RandomForestClassifier()
    .setLabelCol(labelIndexer.getOutputCol)
    .setFeaturesCol(featureIndexer.getOutputCol)
    .setNumTrees(10)

val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, tokenizer, hashingTF, featureIndexer, rf))

labelIndexer = strIdx_1b3fa02c846a
tokenizer = tok_40ec759f5603
hashingTF = hashingTF_25f6b22d261e
featureIndexer = vecIdx_e68fd4c3fe96


rf: org.apache.spark.ml.classification.RandomForestClas...


vecIdx_e68fd4c3fe96

In [10]:
val model = pipeline.fit(raw_sentiment)

model = pipeline_b26c4efe5270


pipeline_b26c4efe5270

In [11]:
model.write.overwrite().save("/home/jovyan/models/spark-ml-model")

In [12]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model")

sameModel = pipeline_b26c4efe5270


pipeline_b26c4efe5270

In [13]:
val predictionsDF = sameModel.transform(raw_sentiment)

predictionsDF.show()

+-----+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               tweet|indexedLabel|               words|            features|     indexedFeatures|       rawPrediction|         probability|prediction|
+-----+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|@switchfoot http:...|         0.0|[@switchfoot, htt...|(1000,[7,14,21,54...|(1000,[7,14,21,54...|[4.43931449741300...|[0.44393144974130...|       1.0|
|    0|is upset that he ...|         0.0|[is, upset, that,...|(1000,[170,193,22...|(1000,[170,193,22...|[5.46349235938573...|[0.54634923593857...|       0.0|
|    0|@Kenichan I dived...|         0.0|[@kenichan, i, di...|(1000,[10,36,77,1...|(1000,[10,36,77,1...|[5.60297627210827...|[0.56029762721082...|       0.0|
|    0|my whole body fee...|         0.0|[my, whole,

predictionsDF = [label: int, tweet: string ... 7 more fields]


[label: int, tweet: string ... 7 more fields]

In [14]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [15]:
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show

+-------------------+
|  clean_probability|
+-------------------+
| 0.5560685502586992|
|0.45365076406142696|
| 0.4397023727891721|
|0.44955645144499445|
| 0.4995220165195461|
| 0.5115763550216391|
| 0.5115763550216391|
| 0.5262646760589238|
| 0.5115763550216391|
| 0.5053858466280851|
| 0.5115763550216391|
|0.44955645144499445|
| 0.4682668970315146|
| 0.5857066790167775|
|0.48798279641549647|
| 0.5203515396734982|
|   0.49992857683324|
| 0.5115763550216391|
|  0.534385261820869|
| 0.5439844652081389|
+-------------------+
only showing top 20 rows

