In [None]:
# import sys
#open "jupyter notebook" from anaconda prompt and then run this
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.5 pyspark-shell'
from elasticsearch import helpers
from elasticsearch import Elasticsearch
from ownelastic import sth2elastic
from pyspark.sql import *
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from datetime import datetime


import nltk
nltk.download('vader_lexicon')

# create instance of elasticsearch
#es = Elasticsearch("http://localhost:9200")

elasticurl = "http://localhost"

def getElastic():
    return Elasticsearch([elasticurl + ':9200'], timeout=30)

#pushing data into elastice search with index
def sth2elastic(doc, index, type):
    #getting the elasticeseach initialization
    esclient = getElastic()
    statcnt = 0
    actions = []
    for row in doc:
        #print("inside elastic: ", row)
        actions.append({
            "_op_type": "index",
            "_index": index,
            "_type": type,
            "_source": row
        })

    for ok, response in helpers.streaming_bulk(esclient, actions, index=index, doc_type=type,max_retries=5,raise_on_error=False, raise_on_exception=False):
        if not ok:
            statcnt+=0
            print(response)
        else:
            statcnt += 1
    return statcnt

def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

#do the sentiment using VEDAR analylser
def sentimentAnalysis(tweet):
    scores = dict([('pos', 0), ('neu', 0), ('neg', 0), ('compound', 0)])
    sid = SentimentIntensityAnalyzer()
    ss = sid.polarity_scores(tweet)
    for k in sorted(ss):
        #print('inside k', k)
        scores[k] += ss[k]
    return json.dumps(scores)

def do_process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        if rdd.count()==0: raise Exception('Waiting for data....') #Empty
        sqlContext = getSqlContextInstance(rdd.context)
        df = sqlContext.read.json(rdd)
        df = df.filter("text not like 'RT @%'")
        if df.count() == 0: raise Exception('Waiting for data....') #Empty
        #print(df.take(20))    
        #calculating polcarity using VEDAR analyser
        udf_func = udf(lambda x: sentimentAnalysis(x),returnType=StringType())
        df = df.withColumn("sentiment",lit(udf_func(df.text)))
        #print("after sentiment:", df.take(10))
        results = df.toJSON().map(lambda j: json.loads(j)).collect()
        #print("to json after sentiment ", results)
        for result in results:
            result["date"] = datetime.strptime(result["date"], '%a %b %d %H:%M:%S %z %Y')            
            result["sentiment"] = json.loads(result["sentiment"])
            #calculating polalirty(i.e.,negative/postive/neutral) based on compound
            
            #print("Tweet text is : ", result["text"]) 
            #print("Overall sentiment dictionary is : ", result["sentiment"]) 
            #print("sentence was rated as ", result["sentiment"]['neg']*100, "% Negative") 
            #print("sentence was rated as ", result["sentiment"]['neu']*100, "% Neutral") 
            #print("sentence was rated as ", result["sentiment"]['pos']*100, "% Positive") 

            #format(temp, '.2f')
            result["negative_pecentage"] = format(result["sentiment"]['neg']*100, '.2f')
            result["neutral_percentage"] = format(result["sentiment"]['neu']*100, '.2f')
            result["positive_percentage"] = format(result["sentiment"]['pos']*100, '.2f')
            
            #print("Sentence Overall Rated As", end = " ") 

            # decide sentiment as positive, negative and neutral 
            if result["sentiment"]['compound'] >= 0.05 : 
                #print("Positive") 
                result["sentiment_type"] = "positive"

            elif result["sentiment"]['compound'] <= - 0.05 : 
                #print("Negative") 
                result["sentiment_type"] = "negative"

            else : 
                #print("Neutral") 
                result["sentiment_type"] = "neutral"

            #result["sentiment_type"] = "none"
            #if result["sentiment"]['compound'] > 0.05:
                #result["sentiment_type"] = "positive"
                #print("old cal POSITIVE")
            #elif result["sentiment"]['compound'] > -0.05 and result["sentiment"]['compound'] < 0.05 :
                #result["sentiment_type"] = "neutral"
                #print("old cal NEUTRAL")
            #elif result["sentiment"]['compound'] <= -0.05:
                #result["sentiment_type"] = "negative"
                #print("old cal NEGATIVE")
        #pushing data into elasticsearch
        print("after sentiment and before push to elastice search: ", results)
        sth2elastic(results,"tweets_new","doc")
    except Exception as e:
        print(e)
        pass


if __name__ == "__main__":
    conf = SparkConf()
    conf.setMaster("local[4]")
    conf.setAppName("TwitterStreaming")
    # Initialize a SparkContext
    sc = SparkContext.getOrCreate(conf=conf)
    
    ssc = StreamingContext(sc, 10)
  
    #consume the tweets from consumer with tweets_new topic
    kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "consumer-group", {"tweets_new": 1})
    
    lines = kafkaStream.map(lambda x: json.loads(x[1]))
    
    lines.foreachRDD(do_process)

    ssc.start()
    ssc.awaitTermination()

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     C:\Users\shiva\AppData\Roaming\nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


Fetching....
[Row(date='Sat Apr 18 22:06:15 +0000 2020', text="Someone is fixin' to get audited!", user='agargmd'), Row(date='Sat Apr 18 22:06:15 +0000 2020', text='Is #Trump on his period or something???\n🤔🤔🤔🤔🤔\n#MANopause #COVID #PressBriefing #PressConference #BreakingNews #POTUS #TrumpLiesAmericansDie #TrumpMeltdown #TrumpIsTheWORSTPresidentEVER #TrumpLiesPeopleDie https://t.co/NwJc1maVyd', user='mmtexas'), Row(date='Sat Apr 18 22:06:17 +0000 2020', text="Fucking RESTOCK Looks like I'll be at home until January 20th 2021\n#25thAmendmentNow #GOPBetrayedAmerica #Trump #TrumpLiesAmericansDie #VoteBlue #VoteBlueNoMatterWho #VoteBlue2020 #Biden2020 #TrumpOwnsEveryDeath #COVID https://t.co/IpZdoQkCge", user='LaurieWalters19'), Row(date='Sat Apr 18 22:06:18 +0000 2020', text='@realphilhendrie I find it strange that being tough on China isn’t universal. #Trump is a weakling .', user='ProHulaHooper'), Row(date='Sat Apr 18 22:06:20 +0000 2020', text='#Trump "They\'re trying to take your guns

[Row(date='Sat Apr 18 22:07:31 +0000 2020', text='@AngrierWHStaff From parks in Texas, to the steps at the Capitol in Michigan to the beaches of Jacksonville Florida...he is trying cause civil riots throughout the United States because #Trump knows he WON’T be re-elected #TrumpIsTheWORSTPresidentEVER', user='gkgguy'), Row(date='Sat Apr 18 22:07:32 +0000 2020', text='Is it time to sit down with my 20 year old son and have a serious, terrifying discussion about how #Trump is trying to kill all of us? \n\nNot joking.', user='LynSonyaA'), Row(date='Sat Apr 18 22:07:34 +0000 2020', text='#WHO IS #CHINA FRIENDLY! AND HATES #USA. BLAME #FUCKINGCHINA FOR THIS VIRUS AND NOT #UnitedStatesOfAmerica NOR #TRUMP. SHIT CAUSED BY CHINA AND THEY KILLED SO MANY PEOPLE. #CHINESE LIED AND COVERED THIS SHIT AND CAUSED A HUGE PROBLEM. ITS WAS FUCKING CHINA!!!!!!', user='Martin25150073')]
after sentiment: [Row(date='Sat Apr 18 22:07:31 +0000 2020', text='@AngrierWHStaff From parks in Texas, to the steps at t

after sentiment: [Row(date='Sat Apr 18 22:07:46 +0000 2020', text='@JoeBiden Have you been sleeping?#COVID #Trump', user='wordsaretools', sentiment='{"pos": 0.0, "neu": 1.0, "neg": 0.0, "compound": 0.0}'), Row(date='Sat Apr 18 22:07:48 +0000 2020', text="Comment: 04182020\n#Trump suggests wrongfully the US deaths fr #COVID19 will be 60k. It's to early to tell the 60k is on the premiss of high mitigation measures and follow up by the public. Lessening the mitigation will raise the R0 with many more people infected, more deaths.", user='PHD2468', sentiment='{"pos": 0.0, "neu": 0.931, "neg": 0.069, "compound": -0.539}'), Row(date='Sat Apr 18 22:07:49 +0000 2020', text='"The democrats are trying to take the 2nd amendment away" - Uh.....no, not really...... This very much feels like his attempt at restarting the civil war to distract from his failures around the handling of COVID.  #Trump #TrumpPressConf', user='realhumanpotato', sentiment='{"pos": 0.062, "neu": 0.736, "neg": 0.203, "compou

Tweet text is :  What.
Overall sentiment dictionary is :  {'pos': 0.0, 'neu': 1.0, 'neg': 0.0, 'compound': 0.0}
sentence was rated as  0.0 % Negative
sentence was rated as  100.0 % Neutral
sentence was rated as  0.0 % Positive
Neutral
old cal NEUTRAL
Tweet text is :  https://t.co/Nj6IZsY8Wz #ComeTogether...#VoteBlue! (Short #Parody and #ProtestSong by #SpencerKarter) For @ELISYELCAJIGAS, @2017fan1, @RamosRazviel, @nesman1985, @TailzYT_osu, @olderbrother21, @JhaSimba, and @jamesjimcie. #Trump is #Treacherous! #TrumpLiesAmericansDie! #FDT
Overall sentiment dictionary is :  {'pos': 0.0, 'neu': 1.0, 'neg': 0.0, 'compound': 0.0}
sentence was rated as  0.0 % Negative
sentence was rated as  100.0 % Neutral
sentence was rated as  0.0 % Positive
Neutral
old cal NEUTRAL
Tweet text is :  Our mistake was taking the absurd man @realDonaldTrump seriously.
#absurd #COVIDIOTS #Trump
https://t.co/OGmVyqzAgP
Overall sentiment dictionary is :  {'pos': 0.0, 'neu': 0.728, 'neg': 0.272, 'compound': -0.4767}

[Row(date='Sat Apr 18 22:08:06 +0000 2020', text='@paulsperry_ @realDonaldTrump WRONG!! #Trump just started the spikes again by encouraging his people to openly ignoring the social distancing rules. These fools will cause another outbreaks, continue to tank the economy and endanger the healthcare professionals #TrumpLiesAmericansDie . https://t.co/SODqpYDItJ', user='PamTeaTea'), Row(date='Sat Apr 18 22:08:06 +0000 2020', text='@MrsAmerica2020 @Breaking911 Any system breeds tyranny if the necessary megalomaniac is in power. Cf. #Trump and his push for #FascistGOP rather than simply the conservatives previously in charge of the party. \n\nBut guessing you as operator of a fake account may already know that. https://t.co/dku5AIYzaa', user='tejaswoman'), Row(date='Sat Apr 18 22:08:07 +0000 2020', text='The Divider in chief president #Trump.', user='thaha_sherwani'), Row(date='Sat Apr 18 22:08:08 +0000 2020', text="'Ask not what your country can do for you, ask what you can do for your coun

