#  Spark Kafka Sentiment Analysis

![](https://static.wixstatic.com/media/f17a52_84852646da5a4e37837a12cb610b2ad8~mv2.png/v1/fill/w_1000,h_673,al_c,usm_0.66_1.00_0.01/f17a52_84852646da5a4e37837a12cb610b2ad8~mv2.png)
[Source](https://www.dataneb.com/post/analyzing-twitter-texts-spark-streaming-example-2)

<div class="jumbotron">
    <center>
        <b>Sentiment Analysis</b> of streaming twitter data using Flume/Kafka/Spark
    </center>
</div>

![](https://i.imgflip.com/40j9cu.jpg)
[NicsMeme](https://imgflip.com/i/40j9cu)

# Workflow Design

## 1) Model Building

Goal: Build Spark Mlib pipeline to classify whether the tweet contains hate speech or not. 

> Focus is not to build a very accurate classification model but to see how to use any model and return results on streaming data

## 2) Initialize Spark Streaming 

Once the model is built, we need to define the source where to get tweet:

### Kafka

## 3) Stream Data

Start stream -> the Spark Streaming API will receive the data after a specified duration

## 4) Predict and Return Results

Once we receive the tweet text, we pass the data into the machine learning pipeline we created and return the predicted sentiment from the model

# Import Libraries

In [30]:
import findspark
import pyspark
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

![](../images/cuofano.png)

# init 1

In [2]:
findspark.find( ) 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TapDataFrame").getOrCreate()
spark

![](http://thejoyofgeek.net/wp-content/uploads/2016/08/robotmask.jpg)
[S2E4](http://thejoyofgeek.net/mr-robot-init_1-review-s2e4/)

 # Let's Start!

# Trainset 
***SentiTUT*** 

http://www.di.unito.it/~tutreeb/sentipolc-evalita16/data.html


![](https://www.visualist.in/assets/images/algorithms_can.jpg)
[Textblob](https://www.visualist.in/sentiment-analysis-with-textblob/)

In [4]:
# idtwitter	subj	opos	oneg	iro	lpos	lneg	top	text

schema = tp.StructType([
    tp.StructField(name= 'id', dataType= tp.StringType(),  nullable= True),
    tp.StructField(name= 'subjective',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'positive',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'negative',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'ironic',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'lpositive',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'lnegative',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'top',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])

In [5]:
# read the dataset  
training_set = spark.read.csv('../spark/dataset/training_set_sentipolc16.csv',
                         schema=schema,
                         header=True,
                         sep=',')

training_set.show(truncate=False)


+------------------+----------+--------+--------+------+---------+---------+---+---------------------------------------------------------------------------------------------------------------------------------------------+
|id                |subjective|positive|negative|ironic|lpositive|lnegative|top|tweet                                                                                                                                        |
+------------------+----------+--------+--------+------+---------+---------+---+---------------------------------------------------------------------------------------------------------------------------------------------+
|122449983151669248|1         |0       |1       |0     |0        |1        |1  |"Intanto la partita per Via Nazionale si complica. #Saccomanni dice che ""mica tutti sono Mario #Monti"" http://t.co/xPtNz4X7 via @linkiesta"|
|125485104863780865|1         |0       |1       |0     |0        |1        |1  |False illusioni, sgradevoli 

![](https://www.meme-arsenal.com/memes/a05a53a96e890dee5a52d1156c01eb06.jpg)

In [6]:
# define stage 1: tokenize the tweet text    
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')

In [7]:
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')

In [8]:
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)

In [9]:
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'positive')

![](https://cdn-images-1.medium.com/max/1600/1*DyD3VP18IV3-lXcKMbyr5w.jpeg)

In [10]:
# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

In [11]:
# fit the pipeline model with the training data
pipelineFit = pipeline.fit(training_set)

In [15]:
modelSummary=pipelineFit.stages[-1].summary
modelSummary 
# https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/classification/LogisticRegressionSummary.html

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x7f2e218f4fd0>

In [16]:
modelSummary.accuracy

0.7388663967611336

![](https://scontent-mxp1-1.xx.fbcdn.net/v/t1.0-9/30414523_355257768301643_808666797657030656_n.jpg?_nc_cat=104&_nc_sid=730e14&_nc_ohc=_t_RW0C5ORAAX_NKEoW&_nc_ht=scontent-mxp1-1.xx&oh=bcbf3e937974d48c17093dd8c639e130&oe=5ED9DC6A)
[DeepLearningNewsAndMemes](https://www.facebook.com/DeepLearningNewsAndMemes/)

In [17]:
tweetDf = spark.createDataFrame(["False illusioni, sgradevoli realtà Mario Monti http://t.co/WOmMCITs via @AddToAny"], tp.StringType()).toDF("tweet")
tweetDf.show(truncate=False)

+---------------------------------------------------------------------------------+
|tweet                                                                            |
+---------------------------------------------------------------------------------+
|False illusioni, sgradevoli realtà Mario Monti http://t.co/WOmMCITs via @AddToAny|
+---------------------------------------------------------------------------------+



In [18]:
pipelineFit.transform(tweetDf).select('tweet','prediction').show(Truncate=false)

+--------------------+----------+
|               tweet|prediction|
+--------------------+----------+
|False illusioni, ...|       0.0|
+--------------------+----------+



In [19]:
tweetDf = spark.createDataFrame(["Mario Monti bene"], tp.StringType()).toDF("tweet")
tweetDf.show(truncate=False)

+----------------+
|tweet           |
+----------------+
|Mario Monti bene|
+----------------+



In [20]:
pipelineFit.transform(tweetDf).select('tweet','prediction').show()

+----------------+----------+
|           tweet|prediction|
+----------------+----------+
|Mario Monti bene|       1.0|
+----------------+----------+



In [22]:
pipelineFit.save("../spark/dataset/model.save")

In [23]:
# Set the model threshold to maximize F-Measure
fMeasure = modelSummary.fMeasureByThreshold
fMeasure.show()

+-------------------+-------------------+
|          threshold|          F-Measure|
+-------------------+-------------------+
| 0.7612165443187658|0.07710171853228055|
| 0.6502257898972215|0.11759425493716337|
| 0.5775497755236252|0.15471534115601915|
| 0.5327163253867352|0.18365627632687445|
| 0.5056779832305835|0.20923579893747446|
| 0.4804979935548759| 0.2334259626836046|
|0.46019804391029195|0.25376012340917853|
|0.44469700717694366| 0.2708177044261066|
| 0.4301320873422842|0.28258488499452356|
|0.41567152448667005|0.29384560654571323|
| 0.4056789732083148|0.30870620881026706|
|0.39613317325565656|0.32284263959390863|
|0.38747512389971067|  0.333553500660502|
| 0.3809560295459846| 0.3490322580645161|
|0.37333830333404583| 0.3593947036569987|
| 0.3659479673235152|0.37119113573407203|
|0.36045581545361804| 0.3779717123081553|
| 0.3561395538563056| 0.3858615611192931|
|0.35047584362556866|0.39573117969426014|
| 0.3445195120670471| 0.4063294716021475|
+-------------------+-------------

In [24]:
fMeasure
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
model.setThreshold(bestThreshold)
bestThreshold

0.21941143502317084

In [25]:
modelSummary=pipelineFit.stages[-1].summary
modelSummary.accuracy

0.7388663967611336

In [26]:
# fit the pipeline model with the training data
pipelineFit = pipeline.fit(training_set)

In [27]:
modelSummary=pipelineFit.stages[-1].summary
modelSummary.accuracy

0.5394062078272605

![](https://i.imgflip.com/40mt0s.jpg)
[NicsMeme](https://imgflip.com/i/40mt0s)

# Another Approach: Naive Bayes

In [28]:
# define stage 3: create a word vector of the size 100
hashingTF = HashingTF(inputCol="filtered_words", outputCol="vector", numFeatures=20)

In [31]:
# define stage 4: Logistic Regression Model
modelNaive =  NaiveBayes(smoothing=1.0, modelType="multinomial",featuresCol= 'vector', labelCol= 'positive')

In [32]:
# setup the pipeline
pipelineNaive = Pipeline(stages= [stage_1, stage_2, hashingTF, modelNaive])

# fit the pipeline model with the training data
pipelineNaiveFit = pipelineNaive.fit(training_set)

In [33]:
pipelineNaiveFit

PipelineModel_172ce70551b8

In [34]:
# select example rows to display.
predictions = pipelineNaiveFit.transform(training_set)
predictions.show()

+------------------+----------+--------+--------+------+---------+---------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                id|subjective|positive|negative|ironic|lpositive|lnegative|top|               tweet|              tokens|      filtered_words|              vector|       rawPrediction|         probability|prediction|
+------------------+----------+--------+--------+------+---------+---------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|122449983151669248|         1|       0|       1|     0|        0|        1|  1|"Intanto la parti...|[intanto, la, par...|[intanto, la, par...|(20,[0,1,2,3,4,5,...|[-65.171974168852...|[0.72519726437760...|       0.0|
|125485104863780865|         1|       0|       1|     0|        0|        1|  1|False illusioni, ...|[false, illusioni...|[false

In [35]:
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="positive", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.7226720647773279


In [36]:
spark.stop()

# Put all toghether 

```bash
# Start Zk
# Start Kafka Server
# Start Spark Ac
 ./sparkSubmitPython.sh twitter_stream_sentiment.py org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.5
 ```


# Biblio

* https://www.analyticsvidhya.com/blog/2019/12/streaming-data-pyspark-machine-learning-model/
* https://www.kdnuggets.com/2018/02/machine-learning-algorithm-2118.html