In [None]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession, Row
import seaborn as sns
import matplotlib.pyplot as plt
import time
from pyspark.ml.feature import NGram
from pyspark.sql.functions import udf,col
import re 
import emoji
from pyspark.ml.feature import Tokenizer
import nltk
from nltk.corpus import stopwords
import string
from pyspark.sql.functions import col, lit
from functools import reduce
from nltk.stem import WordNetLemmatizer
import matplotlib.pyplot as plt
from wordcloud import WordCloud 
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np
import math
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

TCP_IP = "localhost"
TCP_PORT = 9876
KEY_WORD = 'nba'
%matplotlib inline

def cleaner(text):
    no_emoji = emoji.get_emoji_regexp().sub(u'', text)
    no_url = re.sub(r'http\S+', '', no_emoji)
    no_RT = re.sub("RT @[A-Za-z0-9_]+","", no_url)
    no_mentions = re.sub("@[A-Za-z0-9_]+","", no_RT)
    cleanest = re.sub(":","", no_mentions)
    list_punct=list(string.punctuation)
    filtered = [''.join(c for c in s if c not in list_punct) for s in cleanest]
    filtered = ''.join(filtered)
    return filtered

def wordToken(text):
    splitted = nltk.word_tokenize(text)
    stop_words=set(stopwords.words('english'))
    filteredSentence = [w for w in splitted if not w in stop_words]
    return filteredSentence

def wordTokenize(text):
    splitted = nltk.word_tokenize(text)
    stop_words=set(stopwords.words('english'))
    filteredSentence = [w for w in splitted if not w in stop_words]
    nltk.download('wordnet')
    lemmatizer = WordNetLemmatizer()
    finalLem = [lemmatizer.lemmatize(s) for s in filteredSentence]
    finalLem = " ".join(finalLem)
    return finalLem

def sentiment_analysis(text):
    sid_obj = SentimentIntensityAnalyzer()
    sentiment = sid_obj.polarity_scores(text)['compound']
    if sentiment >= 0.05 :
        sentiment_result='positive'
    elif sentiment <= - 0.05 :
        sentiment_result='negative'
    else :
        sentiment_result='neutral'
    return sentiment


def extractPhraseFunct(x):
    from nltk.corpus import stopwords
    stop_words=set(stopwords.words('english'))
    def leaves(tree):
        """Finds NP (nounphrase) leaf nodes of a chunk tree."""
        for subtree in tree.subtrees(filter = lambda t: t.label()=='NP'):
            yield subtree.leaves()
    
    def get_terms(tree):
        for leaf in leaves(tree):
            term = [w for w,t in leaf if not w in stop_words]
            yield term

    sentence_re = r'(?:(?:[A-Z])(?:.[A-Z])+.?)|(?:\w+(?:-\w+)*)|(?:\$?\d+(?:.\d+)?%?)|(?:...|)(?:[][.,;"\'?():-_`])'
    grammar = r"""
    NBAR:
        {<NN.*|JJ>*<NN.*>}  # Nouns and Adjectives, terminated with Nouns
        
    NP:
        {<NBAR>}
        {<NBAR><IN><NBAR>}  # Above, connected with in/of/etc...
    """
    chunker = nltk.RegexpParser(grammar)
    tokens = nltk.regexp_tokenize(x,sentence_re)
    postoks = nltk.tag.pos_tag(tokens) #Part of speech tagging 
    tree = chunker.parse(postoks) #chunking
    terms = get_terms(tree)
    temp_phrases = []
    for term in terms:
        if len(term):
            temp_phrases.append(' '.join(term))
    
    finalPhrase = [w for w in temp_phrases if w] #remove empty lists
    return finalPhrase

def word_counter(pddf):
    
    pd_df=pddf.toPandas()
    wordlist = []
    for i,tweet in enumerate(pd_df['tweet_split']):
        tweet = tweet.replace("[", "")
        tweet = tweet.replace("]", "")
        tweet = tweet.replace("’", "")
        words = tweet.split(", ")
        wordlist.extend(words)
    wordlist[:] = (value for value in wordlist if value != "")
    result = Counter(wordlist).most_common()
    scres = sc.parallelize(result)
    df_fDist = scres.toDF()
    df_fDist.createOrReplaceTempView("myTable") 
    df2 = spark.sql("SELECT _1 AS Keywords, _2 as Frequency from myTable limit 20") 
    pandD = df2.toPandas()
    f1 = plt.figure()
    f1.clear()
    pandD.plot.barh(x='Keywords', y='Frequency', rot=1, figsize=(10,8))
    plt.show()

    
    
def nba_finals(text):
    warriors_words = ["warriors","golden", "state", "gsw", "dubs", "dubnation",
                          "stephen","curry","steph","klay","thompson","green",
                          "draymond", "dray","chase","center","poole","looney",
                          "kerr", "andrew", "wiggins"]
    boston_words = ["boston", "celtics", "bos", "celts", "jayson","tatum","jaylen", "brown",
                        "marcus", "smart", "al", "horford", "td", "garden", "ime", "udoka",
                        "brad", "stevens", "grant", "williams"]
    finals_words = ["finals", "nba", "nbafinals", "playoffs", "game"]
    text1 = text.lower()
    sent = sentiment_analysis(text1)
    var = None
    for i in warriors_words:
        if i in text1 and sent >0:
            var = "warriors positive"
            break
        elif i in text1 and sent <0 :
            var = "warriors negative"
            break
        elif i in text1 and sent == 0:
            var = "warriors neutral"
            break
        
    if var == None:
        for j in boston_words:
            if j in text1 and sent >0:
                var = "celtics positive"
                break
            elif j in text1 and sent <0:
                var = "celtics negative"
                break
            elif j in text1 and sent == 0:
                var = "celtics neutral"
                break
    else: 
        for j in boston_words:
            if j in text1 and sent >0:
                var = "warriors and celtics positive"
                break
            elif j in text1 and sent <0:
                var = "warriors and celtics negative"
                break
            elif j in text1 and sent == 0:
                var = "warriors and celtics neutral"
                break
            
    if var == None:
        for l in finals_words:
            if l in text1 and sent >0:
                var = "warriors and celtics positive"
                break
            elif l in text1 and sent <0:
                var = "warriors and celtics negative"
                break
            elif l in text1 and sent == 0:
                var = "warriors and celtics neutral"
                break
    return var      
     
    
    
def spark(TCP_IP,TCP_PORT,KEY_WORD):
    sc=SparkContext(appName="TwitterStreamming")
    sc.setLogLevel("ERROR")
    ssc=StreamingContext(sc,10)
    
    socket_stream = ssc.socketTextStream(TCP_IP,TCP_PORT)
    
    lines=socket_stream.window(500)
    def process(rdd):
        
        spark=SparkSession \
                .builder \
                .config(conf=rdd.context.getConf()) \
                .getOrCreate()
        rowRdd = rdd.map(lambda x: Row(word=x))

        wordsDataFrame = spark.createDataFrame(rowRdd)
        wordsDataFrame1 = wordsDataFrame.where(wordsDataFrame.word != '')

        udf_cleaner = udf(lambda x: cleaner(x))
        udf_tokenizer = udf(lambda x: wordTokenize(x))
        udf_splitter = udf(lambda x: wordToken(x))
        udf_sentiment = udf(lambda x: sentiment_analysis(x))
        udf_phrase = udf(lambda x: extractPhraseFunct(x))
        udf_teams = udf(lambda x: nba_finals(x))
        wordsDataFrame2= wordsDataFrame1.withColumn("tweet_content",udf_cleaner(col("word"))).select("tweet_content")
        wordsDataFrame_clean = wordsDataFrame2.where(wordsDataFrame2.tweet_content != '')
        wordsDataFrame_clean_split = wordsDataFrame_clean.withColumn("tweet_split",udf_splitter(col("tweet_content"))).select("tweet_content","tweet_split")
        wordsDataFrame_clean_tok = wordsDataFrame_clean.withColumn("tweet_tokenized",udf_tokenizer(col("tweet_content"))).select("tweet_content","tweet_tokenized")
        wordsDataFrame_clean_tok_sent = wordsDataFrame_clean_tok.withColumn("sentiment_score",udf_sentiment(col("tweet_tokenized"))).select("tweet_tokenized","sentiment_score")
        wordsDataFrame_teams = wordsDataFrame_clean_tok.withColumn("sentiment_team",udf_teams(col("tweet_tokenized"))).select("tweet_tokenized","sentiment_team")        
        
        tokenizer = Tokenizer(inputCol="tweet_content", outputCol="tweet_words")
        wordsData = tokenizer.transform(wordsDataFrame_clean)
        
        print("The graphs for the next batch: \n \n \n")
        


        wordsDataFrame_clean_split.createOrReplaceTempView("splits")
        wordCountsDataFrame = spark.sql("select tweet_split from splits")
        pd_df=wordCountsDataFrame.toPandas()
        wordlist = []
        for i,tweet in enumerate(pd_df['tweet_split']):
            tweet = tweet.lower()
            tweet = tweet.replace("[", "")
            tweet = tweet.replace("]", "")
            tweet = tweet.replace("’", "")
            words = tweet.split(", ")
            wordlist.extend(words)
        wordlist[:] = (value for value in wordlist if value != "")
        result = Counter(wordlist).most_common(20)
        columns = ['word','frequency']
        df_1word = pd.DataFrame([x for x in result], columns=columns)
        f1 = plt.figure()
        df_1word.plot.barh(x='word', y='frequency', rot=1, figsize=(10,8))
        plt.title("Frequency of words in tweets")
        plt.show()
        
        ngram2model = NGram(inputCol="tweet_words", outputCol="ngrams")
        ngram2 = ngram2model.transform(wordsData).select('ngrams')      
        ngram2.createOrReplaceTempView("ngr")
        ngramsCountsDataFrame = spark.sql("select ngrams from ngr")
        pd_ngram=ngramsCountsDataFrame.toPandas()
        gramlist = []
        for kk,gram in enumerate(pd_ngram['ngrams']):
            grama = gram[1:len(gram)-1]
            gramlist.extend(grama)
        gramlist[:] = (value for value in gramlist if value != "")
        result_g = Counter(gramlist).most_common(15)
        columns2 = ['twograms','frequency']
        df_2word = pd.DataFrame([x for x in result_g], columns=columns2)
        f3 = plt.figure()
        df_2word.plot.barh(x='twograms', y='frequency', rot=1, figsize=(10,8))
        plt.title("Frequency of 2grams in tweets")
        plt.show()
        
        ngram3model = NGram(n=3, inputCol="tweet_words", outputCol="ngrams")
        ngram3 = ngram3model.transform(wordsData).select('ngrams')
        ngram3.createOrReplaceTempView("ngr3")
        ngramsCountsDataFrame3 = spark.sql("select ngrams from ngr3")
        pd_ngram3=ngramsCountsDataFrame3.toPandas()
        gramlist3 = []
        for kk,gram3 in enumerate(pd_ngram3['ngrams']):
            gram3a = gram3[1:len(gram3)-1]
            gramlist3.extend(gram3a)
        gramlist3[:] = (value for value in gramlist3 if value != "")
        result_g3 = Counter(gramlist3).most_common(15)
        columns3 = ['threegrams','frequency']
        df_3word = pd.DataFrame([x for x in result_g3], columns=columns3)
        f4 = plt.figure()
        df_3word.plot.barh(x='threegrams', y='frequency', rot=1, figsize=(10,8))
        plt.title("Frequency of 3grams in tweets")
        plt.show()
        
        
        wordsDataFrame_clean_tok_sent.createOrReplaceTempView("sent")
        wordSentDataFrame = spark.sql("select sentiment_score from sent")
        pd_sent=wordSentDataFrame.toPandas()
        scores = []
        for j,score in enumerate(pd_sent['sentiment_score']):
            scores.append(float(score))
        bins = np.linspace(-1, 1, 10)
        f2 = plt.figure()
        plt.hist(scores, bins=bins)
        plt.axvline(x=np.mean(scores), color='r')
        plt.title('Histogram of sentiment score for each tweet')
        plt.xlabel('Score')
        plt.ylabel('Value')
        plt.show()

        wordsDataFrame_teams_nn = wordsDataFrame_teams.where(col("sentiment_team").isNotNull())
        num = np.round(wordsDataFrame_teams_nn.count()/wordsDataFrame_teams.count(),3)*100
        wordsDataFrame_teams_nn.createOrReplaceTempView("teams")
        wordTeamsDataFrame = spark.sql("select  sentiment_team, count(sentiment_team) as sentiment from teams group by sentiment_team")
        pd_teams=wordTeamsDataFrame.toPandas()
        f5 = plt.figure()
        pd_teams.groupby(['sentiment_team']).sum().plot(kind='pie', y='sentiment', figsize=(5, 5),legend = False)
        plt.title(f"The {num} percent of tweets deal with the finals' rivarly")
        plt.show()
    lines.foreachRDD(process)
    ssc.start()

if __name__=="__main__":
    
    spark(TCP_IP,TCP_PORT,KEY_WORD)

### Task2

In [None]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession, Row
import seaborn as sns
import matplotlib.pyplot as plt
import time
from pyspark.ml.feature import NGram
from pyspark.sql.functions import udf,col
import re 
import emoji
from pyspark.ml.feature import Tokenizer
import nltk
from nltk.corpus import stopwords
import string
from pyspark.sql.functions import col, lit
from functools import reduce
from nltk.stem import WordNetLemmatizer
import matplotlib.pyplot as plt
from wordcloud import WordCloud 
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np
import math
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

def spark(TCP_IP,TCP_PORT):
    sc=SparkContext(appName="TwitterStreamming")
    sc.setLogLevel("ERROR")
    ssc=StreamingContext(sc,5)
    
    socket_stream = ssc.socketTextStream(TCP_IP,TCP_PORT)
    
    lines=socket_stream.window(300)
    df=lines.flatMap(lambda x:x.split(" "))  \
            .filter(lambda x:x.startswith("#")) 
    
    def process(rdd):
        spark=SparkSession \
                .builder \
                .config(conf=rdd.context.getConf()) \
                .getOrCreate()
    
        rowRdd = rdd.map(lambda x: Row(word=x))
        wordsDataFrame = spark.createDataFrame(rowRdd)
    
        wordsDataFrame.createOrReplaceTempView("words")
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word order by 2 desc")       
        pd_df=wordCountsDataFrame.toPandas()
        
        plt.figure( figsize = ( 10, 8 ) )
        sns.barplot( x="total", y="word", data=pd_df.head(20))
        plt.title("Most popular hashtags")
        plt.show()
        
    df.foreachRDD(process)
    
    ssc.start()
    
TCP_IP = "localhost"
TCP_PORT = 9876  
    
if __name__=="__main__":
    
    spark(TCP_IP,TCP_PORT)