In [0]:
pip install textblob

Python interpreter will be restarted.
Collecting textblob
  Downloading textblob-0.17.1-py2.py3-none-any.whl (636 kB)
Collecting nltk>=3.1
  Downloading nltk-3.8-py3-none-any.whl (1.5 MB)
Collecting click
  Downloading click-8.1.3-py3-none-any.whl (96 kB)
Collecting regex>=2021.8.3
  Downloading regex-2022.10.31-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (772 kB)
Collecting tqdm
  Downloading tqdm-4.64.1-py2.py3-none-any.whl (78 kB)
Installing collected packages: tqdm, regex, click, nltk, textblob
Successfully installed click-8.1.3 nltk-3.8 regex-2022.10.31 textblob-0.17.1 tqdm-4.64.1
Python interpreter will be restarted.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob

In [0]:
def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

In [0]:
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity
def polarity_classification(text):
    polar = TextBlob(text).sentiment.polarity
    if polar < 0:
        return 'Negative'
    elif polar == 0:
        return 'Neutral'
    else: return 'Positive'

def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    
    polarity_classification_udf = udf(polarity_classification, StringType())
    words = words.withColumn("sentiment", polarity_classification_udf("word"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words

In [0]:
if __name__ == "__main__":
    # create Spark session
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

    # read the tweet data from socket
    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "0.0.0.0") \
        .option("port", 5555) \
        .load()
    # Preprocess the data
    words = preprocessing(lines)
    # text classification to define polarity and subjectivity
    words = text_classification(words)

    
    writeTweet = words.writeStream. \
        outputMode('append'). \
        format("memory"). \
        queryName("tweetstream"). \
        trigger(processingTime='2 seconds'). \
        start()

In [0]:
%sql
select * from tweetstream LIMIT 1000

word,polarity,sentiment,subjectivity
LIONEL MESSI FINALLY GETS HIS DREAM THE FIFA WORLD CUP! 🐐🏆,0.0,Neutral,1.0
"Messi needed Referees, FIFA, VAR, penalties, fixing, corruption, rigging to…",0.0,Neutral,0.0
2 Robbed Ballon’d ors,0.0,Neutral,0.0
Although FIFA r…,0.0,Neutral,0.0
FIFA World Cup 2022 | “Football In Argentina Is A passion” Argentinian Envoy To NDTV,0.0,Neutral,0.0
Rewatch the GOAT claim the 2022 FIFA World Cup trophy in our 9…,0.0,Neutral,0.0
Happy New Year!,0.4852272727272727,Positive,0.7272727272727273
2x Best FIFA Men's Player,1.0,Positive,0.3
🏆🏆🏆🏆 Champions League,0.0,Neutral,0.0
To enter,0.0,Neutral,0.0
