In [None]:
import findspark

In [None]:
findspark.init('/home/rathin/spark-3.0.3-bin-hadoop2.7')

In [None]:
import json, requests, sys
import nltk
from nltk.corpus import stopwords
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from operator import add
from textblob import TextBlob
import pickle
from sentiment_model import TweetsClassifier

In [None]:
filename = 'finalized_classifier.sav'
sentiment_classifier = pickle.load(open(filename, 'rb'))

In [None]:
def sendTopWords(counts, url):
    def takeAndSend(time, rdd):
        if not rdd.isEmpty():
            word_counts = rdd.take(10)

            words = []
            values = []

            for (word, count) in word_counts:
                words.append(word)
                values.append(count)

            json_data = {'words': str(words), 'counts': str(values)}
            print(json_data)

            response = requests.post(url, data=json_data)

    counts.foreachRDD(takeAndSend)

def sendTweetSentiments(sentiments, url):
    def takeAndSend(time, rdd):
        if not rdd.isEmpty():
            (name, (total, (pos, neutral, neg))) = rdd.first()

            json_data = {'positive': pos, 'neutral': neutral, 'negative': neg, 'total': total}
            print(json_data)

            response = requests.post(url, data=json_data)

    sentiments.foreachRDD(takeAndSend)
    
def getSentiment(text):
    #sent = TextBlob(text).sentiment.polarity
    sent = sentiment_classifier.classify([text])

    if sent[0] == 1:
        return (1, 0, 0)
#     elif sent == 0:
#         return (0, 1, 0)
    else:
        return (0, 0, 1)
    
def sendTweetSentimentsFromStream(kvs, url):
    sentiments = kvs.map(lambda x: json.loads(x)) \
                    .map(lambda json_object: (json_object["user"]["screen_name"], json_object["text"], getSentiment(json_object["text"]))) \
                    .map(lambda kv: ('count', (1, kv[2]))) \
                    .reduceByKey(lambda a, b: (a[0] + b[0], (a[1][0] + b[1][0], a[1][1] + b[1][1], a[1][2] + b[1][2])))
    sentiments.pprint()
    sendTweetSentiments(sentiments, url)

def sendTopHashtagsFromStream(kvs, url):
    tweets = kvs.map(lambda x: json.loads(x)) \
                .map(lambda json_object: (json_object["user"]["screen_name"], json_object["text"]))

    lines = tweets.flatMap(lambda line: line[1].split(" "))

    ## This part does the hashtag count
    hashtag_counts = lines.filter(lambda word: len(word) >= 2 and word[0] == '#') \
                          .map(lambda word: (word, 1)) \
                          .reduceByKey(add) \
                          .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending = False))
    hashtag_counts.pprint()
    sendTopWords(hashtag_counts, url)

def sendTopWordsFromStream(kvs, url):
    tweets = kvs.map(lambda x: json.loads(x)) \
                .map(lambda json_object: (json_object["user"]["screen_name"], json_object["text"]))

    lines = tweets.flatMap(lambda line: line[1].split(" "))

    ## This part does the word count
    sw = stopwords.words('english')
    sw.extend(['rt', 'https', 'http', 'coronavirus', 'covid19', 'covid-19'])

    counts = lines.map(lambda word: word.strip().lower()) \
                  .filter(lambda word: word not in sw) \
                  .filter(lambda word: len(word) >= 2 and word[0] != '#' and word[0] != '@') \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(add) \
                  .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending = False))
    counts.pprint()
    sendTopWords(counts, url)

In [None]:
sc = SparkContext(appName="tweetStream")
# Create a local StreamingContext with batch interval of 2 second
ssc = StreamingContext(sc, 2)
# Create a DStream that conencts to hostname:port
kvs = ssc.socketTextStream("localhost", 5555)

In [None]:
nltk.download('stopwords')

In [None]:
server = 'http://localhost:5000/real-time/'
sendTopHashtagsFromStream(kvs, server + 'update_hashtagcounts')
sendTopWordsFromStream(kvs, server + 'update_counts')
sendTweetSentimentsFromStream(kvs, server + 'update_sentiments')

In [None]:
# Start computing
ssc.start()        
# Wait for termination
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)