In [0]:
!pip install nltk
import nltk
nltk.download('vader_lexicon')
nltk.download('stopwords')

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, split
import pyspark.sql.functions as F
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer as SIA
from nltk.corpus import stopwords
nltk.download('stopwords')
nltk.download('vader_lexicon')
stop_words = stopwords.words('english')
sia = SIA()

if __name__ == "__main__":

    # create Spark session
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

    # read the tweet data from socket
    tweet_df= spark \
        .readStream \
        .format("socket") \
        .option("host", 'localhost') \
        .option("port", 7524) \
        .load()

    tweet_df = tweet_df.withColumn("time_st", date_format(current_timestamp(), 'HH:mm'))
    tweet_df = tweet_df.withColumn("time_stamp", date_format(current_timestamp(), 'HH:mm:ss'))
    udfsomefunc = F.udf(sia.polarity_scores, MapType(StringType(), DoubleType()))
    tweet_df = tweet_df.withColumn("sia", F.when(udfsomefunc("value")["compound"] >0.4, 'positive').otherwise(F.when(udfsomefunc("value")["compound"] <-0.4,'negative').otherwise('neutral')))


    # write the above data into memory. consider the entire analysis in all iteration (output mode = complete). and let the trigger runs in every 2 secs.
    writeTweet = tweet_df.writeStream. \
        outputMode("append"). \
        format("memory"). \
        queryName("tweetquery"). \
        start()

  
    print("----- streaming is running -------")

In [0]:
%sql 
select *  
from tweetquery
limit 1000

value,time_st,time_stamp,sia
18f i want someone to treat me as a princess/sweetheart while taking advantage of me..,16:48,16:48:04,positive
who wants to help me practice my aim?,16:48,16:48:04,positive
chat porn gratis,16:48,16:48:04,neutral
a little bit more truth,16:48,16:48:04,neutral
"hey kenny, a message for you",16:48,16:48:04,neutral
do you mind a belly? 🥺,16:48,16:48:04,neutral
faction server,16:48,16:48:04,neutral
🚀 whitebear [ new launch 💎gem 💎 ] - ownership renounced. liquidity locked new coin/token (solid fundamentals),16:48,16:48:04,neutral
i'm never get tired of seeing on this pussy,16:48,16:48:04,neutral
(i could reaaallyy use some feedbacki) biomutant rocket's amnesia ep2,16:48,16:48:04,neutral


In [0]:
%sql
select count(sia), sia from tweetquery group by sia

count(sia),sia
78,positive
259,neutral
30,negative


In [0]:
%sql
select count(*), time_stamp, sia from tweetquery group by time_stamp, sia

count(1),time_stamp,sia
119,16:48:04,neutral
35,16:48:04,positive
13,16:48:04,negative
10,16:48:06,positive
16,16:48:06,neutral
1,16:48:08,positive
6,16:48:08,neutral
2,16:48:10,positive
4,16:48:09,positive
5,16:48:07,positive


In [0]:
%sql
select count(*), time_st, sia from tweetquery group by time_st, sia

count(1),time_st,sia
114,16:48,positive
402,16:48,neutral
50,16:48,negative


In [0]:
df = sqlContext.sql("select * from tweetquery")

df = df.withColumn('word', explode(split(col('value'), ' ')))
df = df.withColumn("word", F.regexp_replace(col("word"), "[^A-Za-z]", ""))
df = df.filter(df.word!="")

negative_comments = df.filter(df.sia == 'negative')\
                       .filter(~df.word.isin(stop_words))\
                       .groupBy('word') \
                       .count() \
                       .sort('count', ascending=False) \
                       .limit(10)

positive_comments = df.filter(df.sia == 'positive')\
                       .filter(~df.word.isin(stop_words))\
                       .groupBy('word') \
                       .count() \
                       .sort('count', ascending=False) \
                       .limit(10)

display(negative_comments)
display(positive_comments)

word,count
fuck,11
ill,8
sexting,8
first,8
add,7
send,6
bad,6
tight,6
looking,6
pussy,6


word,count
free,24
best,18
anime,18
good,18
love,15
gmk,15
play,14
top,14
fun,13
new,13
