#  Spark Kafka Sentiment Analysis

![](https://static.wixstatic.com/media/f17a52_84852646da5a4e37837a12cb610b2ad8~mv2.png/v1/fill/w_1000,h_673,al_c,usm_0.66_1.00_0.01/f17a52_84852646da5a4e37837a12cb610b2ad8~mv2.png)
[Source](https://www.dataneb.com/post/analyzing-twitter-texts-spark-streaming-example-2)

<div class="jumbotron">
    <center>
        <b>Sentiment Analysis</b> of streaming twitter data using Flume/Kafka/Spark
    </center>
</div>

![](https://i.imgflip.com/40j9cu.jpg)
[NicsMeme](https://imgflip.com/i/40j9cu)

# Workflow Design

## 1) Model Building

Goal: Build Spark Mlib pipeline to classify whether the tweet contains hate speech or not. 

> Focus is not to build a very accurate classification model but to see how to use any model and return results on streaming data

## 2) Predict and Return Results

Once we get a new the tweet (and we will do using kafka streaming), 
we pass the data into the machine learning pipeline we created and return the predicted sentiment from the model

# Import Libraries

In [1]:
import findspark
import pyspark
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

![](images/cuofano.png)

# init 1

In [2]:
findspark.find( ) 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TapDataFrame").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/24 15:24:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


![](http://thejoyofgeek.net/wp-content/uploads/2016/08/robotmask.jpg)
[S2E4](http://thejoyofgeek.net/mr-robot-init_1-review-s2e4/)

 # Let's Start!

# Trainset 
***SentiTUT*** 

http://www.di.unito.it/~tutreeb/sentipolc-evalita16/data.html


In [3]:
# idtwitter	subj	opos	oneg	iro	lpos	lneg	top	text

schema = tp.StructType([
    tp.StructField(name= 'id', dataType= tp.StringType(),  nullable= True),
    tp.StructField(name= 'subjective',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'positive',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'negative',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'ironic',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'lpositive',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'lnegative',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'top',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'text',       dataType= tp.StringType(),   nullable= True)
])

In [4]:
# read the dataset  
training_set = spark.read.csv('../spark/dataset/training_set_sentipolc16.csv',
                         schema=schema,
                         header=True,
                         sep=',')
training_set

DataFrame[id: string, subjective: int, positive: int, negative: int, ironic: int, lpositive: int, lnegative: int, top: int, text: string]

In [5]:
training_set.show(truncate=False)

+------------------+----------+--------+--------+------+---------+---------+---+---------------------------------------------------------------------------------------------------------------------------------------------+
|id                |subjective|positive|negative|ironic|lpositive|lnegative|top|text                                                                                                                                         |
+------------------+----------+--------+--------+------+---------+---------+---+---------------------------------------------------------------------------------------------------------------------------------------------+
|122449983151669248|1         |0       |1       |0     |0        |1        |1  |"Intanto la partita per Via Nazionale si complica. #Saccomanni dice che ""mica tutti sono Mario #Monti"" http://t.co/xPtNz4X7 via @linkiesta"|
|125485104863780865|1         |0       |1       |0     |0        |1        |1  |False illusioni, sgradevoli 

In [6]:
#training_set.show(truncate=False)
training_set.groupBy("positive").count().show()

+--------+-----+
|positive|count|
+--------+-----+
|       1| 2051|
|       0| 5359|
+--------+-----+



In [7]:
# define stage 1: tokenize the tweet text    
stage_1 = RegexTokenizer(inputCol= 'text' , outputCol= 'tokens', pattern= '\\W')

In [8]:
# define stage 2: remove the stop words
ita=StopWordsRemover.loadDefaultStopWords("italian")
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words',stopWords=ita)

23/05/24 15:24:19 WARN StopWordsRemover: Default locale set was [en_IT]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


In [9]:
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)

In [10]:
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'positive')

![](https://cdn-images-1.medium.com/max/1600/1*DyD3VP18IV3-lXcKMbyr5w.jpeg)

In [11]:
# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])
pipeline

Pipeline_c626d3e2747c

In [13]:
# fit the pipeline model with the training data
pipelineFit = pipeline.fit(training_set)

23/05/24 15:24:46 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/24 15:24:46 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [14]:
modelSummary=pipelineFit.stages[-1].summary
modelSummary 
# https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/classification/LogisticRegressionSummary.html

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x107938280>

In [15]:
modelSummary.accuracy

0.7437246963562752

![](images/accuracy.jpg)
[DeepLearningNewsAndMemes](https://www.facebook.com/DeepLearningNewsAndMemes/)

In [21]:
tweetDf = spark.createDataFrame(["False illusioni, sgradevoli realtà Mario Monti http://t.co/WOmMCITs via @AddToAny"], tp.StringType())
tweetDf.show(truncate=False)

Py4JError: An error occurred while calling None.org.apache.spark.api.python.PythonFunction. Trace:
py4j.Py4JException: Constructor org.apache.spark.api.python.PythonFunction([class [B, class java.util.HashMap, class java.util.ArrayList, class java.lang.String, class java.lang.String, class java.util.ArrayList, class org.apache.spark.api.python.PythonAccumulatorV2]) does not exist
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:180)
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:197)
	at py4j.Gateway.invoke(Gateway.java:237)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)



In [None]:
pipelineFit.transform(tweetDf).select('text','tokens','prediction').show(truncate=False)

In [None]:
tweetDf = spark.createDataFrame(["Amore"], tp.StringType()).toDF("text")
tweetDf.show(truncate=False)

In [None]:
pipelineFit.transform(tweetDf).select('tokens','prediction').show(truncate=False)

In [None]:
pipelineFit.save("../spark/dataset/model.save")

In [None]:
# Set the model threshold to maximize F-Measure
fMeasure = modelSummary.fMeasureByThreshold
fMeasure.show()

In [None]:
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)')
maxFMeasure.show()

In [None]:
bestThreshold=fMeasure.where(fMeasure['F-Measure'] == 0.4953159598024187)
bestThreshold.show()

In [None]:
model.setThreshold(0.2457349436145636)

In [None]:
modelSummary=pipelineFit.stages[-1].summary
modelSummary.accuracy

In [None]:
# fit the pipeline model with the training data
pipelineFit = pipeline.fit(training_set)

In [None]:
modelSummary=pipelineFit.stages[-1].summary
modelSummary.accuracy

![](https://i.imgflip.com/40mt0s.jpg)
[NicsMeme](https://imgflip.com/i/40mt0s)

# Another Approach: Naive Bayes

In [None]:
# define stage 3: create a word vector of the size 100
hashingTF = HashingTF(inputCol="filtered_words", outputCol="vector", numFeatures=20)

In [None]:
# define stage 4: Logistic Regression Model
modelNaive =  NaiveBayes(smoothing=1.0, modelType="multinomial",featuresCol= 'vector', labelCol= 'positive')

In [None]:
# setup the pipeline
pipelineNaive = Pipeline(stages= [stage_1, stage_2, hashingTF, modelNaive])

# fit the pipeline model with the training data
pipelineNaiveFit = pipelineNaive.fit(training_set)

In [None]:
pipelineNaiveFit

In [None]:
# select example rows to display.
predictions = pipelineNaiveFit.transform(training_set)
predictions.show()

In [None]:
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="positive", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

# Yet another one 
https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35


In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
model = LogisticRegression(featuresCol= 'features', labelCol= 'positive',maxIter=100)
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, model])


In [None]:
# fit the pipeline model with the training data
pipelineFit = pipeline.fit(training_set)

In [None]:
modelSummary=pipelineFit.stages[-1].summary
modelSummary.accuracy

In [None]:
ss = tp.StructType([tp.StructField("text",tp.StringType(),True)])


In [None]:
tweetDf = spark.createDataFrame(["Mario Monti sul Corriere: la fotografia più illuminante sulla delicata situazione attuale http://t.co/YbuNZMOJ"], tp.StringType()).toDF("text")
tweetDf.select("text").show(truncate=False)
tw2=pipelineFit.transform(tweetDf)
tw2.select("prediction").show()

In [None]:
tweetDf = spark.createDataFrame(["Le 5 sgradevoli realtà di cui Berlusconi dovrebbe rendersi personalmente conto http://t.co/G3u1iF9n Mario Monti non usa mezzi termini"], tp.StringType()).toDF("text")
tweetDf.select("text").show(truncate=False)
tw2=pipelineFit.transform(tweetDf)
tw2.select("prediction").show()

In [None]:
tweetDf = spark.createDataFrame(["Monti mi piace"], tp.StringType()).toDF("text")
tweetDf.select("text").show(truncate=False)
tw2=pipelineFit.transform(tweetDf)
tw2.select("prediction").show()

In [None]:
predictions = pipelineFit.transform(training_set)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="positive")
evaluator.evaluate(predictions)

In [None]:
tweetDf = spark.createDataFrame(["brutte cose"], tp.StringType()).toDF("text")
tweetDf.select("text").show(truncate=False)
tw2=pipelineFit.transform(tweetDf)
tw2.select("prediction").show()

In [None]:
accuracy = predictions.filter(predictions.positive == predictions.prediction).count() / float(training_set.count())
accuracy

In [None]:
spark.stop()

# Biblio

* https://www.analyticsvidhya.com/blog/2019/12/streaming-data-pyspark-machine-learning-model/
* https://www.kdnuggets.com/2018/02/machine-learning-algorithm-2118.html
* https://towardsdatascience.com/sentiment-analysis-using-logistic-regression-and-naive-bayes-16b806eb4c4b
* http://www.di.unito.it/~tutreeb/sentipolc-evalita16/data.html
* http://www.di.unito.it/~tutreeb/ironita-evalita18/data.html
* https://towardsdatascience.com/sentiment-analysis-and-emotion-recognition-in-italian-using-bert-92f5c8fe8a2
* https://github.com/charlesmalafosse/open-dataset-for-sentiment-analysis
* https://iris.unito.it/retrieve/handle/2318/146318/175020/21_Paper.pdf
* https://aperto.unito.it/retrieve/handle/2318/1698302/496918/Sentiment%20analysis%20on%20Italian%20tweets.pdf
* https://iopscience.iop.org/article/10.1088/1742-6596/1000/1/012130/pdf
* https://towardsdatascience.com/sentiment-analysis-using-logistic-regression-and-naive-bayes-16b806eb4c4b
* https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35
* https://dzone.com/articles/streaming-machine-learning-pipeline-for-sentiment
* https://databricks.com/wp-content/uploads/2015/10/STEP-3-Sentiment_Analysis.html
* https://github.com/P7h/Spark-MLlib-Twitter-Sentiment-Analysis/blob/master/src/main/scala/org/p7h/spark/sentiment/mllib/MLlibSentimentAnalyzer.scala
* https://www.researchgate.net/publication/315913579_An_Apache_Spark_Implementation_for_Sentiment_Analysis_on_Twitter_Data
* https://medium.com/analytics-vidhya/congressional-tweets-using-sentiment-analysis-to-cluster-members-of-congress-in-pyspark-10afa4d1556e
* https://developer.hpe.com/blog/streaming-ml-pipeline-for-sentiment-analysis-using-apache-apis-kafka-spark-and-drill-part-2/
* https://dataespresso.com/en/2017/10/24/comparison-between-naive-bayes-and-logistic-regression/#:~:text=Na%C3%AFve%20Bayes%20has%20a%20naive,belonging%20to%20a%20certain%20class.
