In [None]:
import requests
import sys

In [None]:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
from pyspark.sql.types import StringType
from pyspark.sql import functions as F

In [None]:
from nltk.sentiment import SentimentIntensityAnalyzer
import nltk
nltk.download('vader_lexicon')

In [None]:
def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

In [None]:
def aggregate_data_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

## Sentiment Analysis

#### Polarity Detection
Functions below accept a string and return a polarity score.

In [None]:
def compound_detection(text):
    sia = SentimentIntensityAnalyzer()
    return sia.polarity_scores(text)["compound"]

def pos_detection(text):
    sia = SentimentIntensityAnalyzer()
    return sia.polarity_scores(text)["pos"]

def neu_detection(text):
    sia = SentimentIntensityAnalyzer()
    return sia.polarity_scores(text)["neu"]

def neg_detection(text):
    sia = SentimentIntensityAnalyzer()
    return sia.polarity_scores(text)["neg"]

#### Sentiment Analysis
Function sentiment_analysis accepts a DataFrame as input.  
Creates new column in a DataFrame for each polarity score.
Returns updated DataFrame.

In [None]:
def sentiment_analysis(tweets):
    compound_detection_udf = F.udf(compound_detection, StringType())
    tweets = tweets.withColumn("comp", compound_detection_udf("tweet"))
    
    positivity_detection_udf = F.udf(pos_detection, StringType())
    tweets = tweets.withColumn("pos", positivity_detection_udf("tweet"))
    
    neu_detection_udf = F.udf(neu_detection, StringType())
    tweets = tweets.withColumn("neu", neu_detection_udf("tweet"))
    
    neg_detection_udf = F.udf(neg_detection, StringType())
    tweets = tweets.withColumn("neg", neg_detection_udf("tweet"))
    
    return tweets

#### Cleaning Data
Function clean_tweets accepts a DataFrame as input.  
Cleans text in tweet column of all links, hashtags, mentioned users, as well as several punctuation marks.  
Returns updated DataFrame.  

In [None]:
def clean_tweets(tweets):
    tweets = tweets.na.replace('', None)
    tweets = tweets.na.drop()

    tweets = tweets.withColumn('tweet', F.regexp_replace('tweet', r'http\S+', ''))
    tweets = tweets.withColumn('tweet', F.regexp_replace('tweet', r'(#\w+)', ''))
    tweets = tweets.withColumn('tweet', F.regexp_replace('tweet', r'(@\w+)', ''))
    tweets = tweets.withColumn('tweet', F.regexp_replace('tweet', ':', ''))
    tweets = tweets.withColumn('tweet', F.regexp_replace('tweet', 'RT', ''))
    tweets = tweets.withColumn('tweet', F.regexp_replace('tweet', r'[^a-zA-Z0-9 -]', ''))
    tweets = tweets.withColumn('tweet', F.trim(tweets.tweet))
    
    return tweets

#### Send analysis to Flask Application
Function send_sentiment_analysis_to_dashboard accepts a DataFrame as input.  
Extracts sentiment analysis results.  
Computes mean for all polarity values.  
Sends values to Flask application.

In [None]:
def send_sentiment_analysis_to_dashboard(df):
    # 
    comp_lst = [float(t.comp) for t in df.select("comp").collect()]
    comp_mean = sum(comp_lst)/len(comp_lst)
    pos_lst = [float(t.pos) for t in df.select("pos").collect()]
    pos_mean = sum(pos_lst)/len(pos_lst)
    neu_lst = [float(t.neu) for t in df.select("neu").collect()]
    neu_mean = sum(neu_lst)/len(neu_lst)
    neg_lst = [float(t.neg) for t in df.select("neg").collect()]
    neg_mean = sum(neg_lst)/len(neg_lst)
    url = 'http://localhost:5001/sentiment/updateData'
    request_data = {'comp': str(comp_mean), 'pos': str(pos_mean), 'neu': str(neu_mean), 'neg': str(neg_mean)}
    response = requests.post(url, data=request_data)

#### Process RDDs for Sentiment Analysis
Get spark sql singleton context from the current context.  
Convert the RDD to Row RDD.  
Create a DF from the Row RDD.  
Register the dataframe as table.  
Get the top 10 hashtags from the table using SQL.
Call this method to prepare comp mean DF and send it.

In [None]:
def process_rdd_sentiment(time, rdd):
    print("SENTIMENT----------- %s -----------" % str(time))
    try:
        sql_context = get_sql_context_instance(rdd.context)
        row_rdd = rdd.map(lambda w: Row(tweet=w[0], comp=w[1]))
        sentiment_df = sql_context.createDataFrame(row_rdd)
        sentiment_df.registerTempTable("sentiment")
        sentiment_raw_df = sql_context.sql("select tweet, comp from sentiment order by comp desc limit 50")
        sentiment_clean_df = clean_tweets(sentiment_raw_df)
        sentiment_analyzed_df = sentiment_analysis(sentiment_clean_df)
        send_sentiment_analysis_to_dashboard(sentiment_analyzed_df)
    except:
        e = sys.exc_info()[0]
        print("Error: %s" % e)

## Extracting Top Hashtags

#### Send hashtags to Flask Application
Extract the hashtags from dataframe and convert them into arraycompound.  
Extract the counts from dataframe and convert them into array.  
Initialize and send the data through REST API.

In [None]:
def send_hashtag_df_to_dashboard(df):
    top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
    tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
    url = 'http://localhost:5001/hashtags/updateData'
    request_data = {'label': str(top_tags), 'data': str(tags_count)}
    response = requests.post(url, data=request_data)

#### Process RDDs for Hashtags
Get spark sql singleton context from the current context.  
Convert the RDD to Row RDD.  
Create a DF from the Row RDD.  
Register the dataframe as table.  
Get the top 10 hashtags from the table using SQL.
Call this method to prepare comp mean DF and send it.

In [None]:
def process_rdd_hashtags(time, rdd):
    print("HASHTAG----------- %s -----------" % str(time))
    try:
        sql_context = get_sql_context_instance(rdd.context)
        row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
        hashtags_df = sql_context.createDataFrame(row_rdd)
        hashtags_df.registerTempTable("hashtags")
        hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 20")
        send_hashtag_df_to_dashboard(hashtag_counts_df)
    except:
        e = sys.exc_info()[0]
        print("Error: %s" % e)

## Extracting Top Mentioned Users

#### Send mentioned users to Flask Application
Extract the hashtags from dataframe and convert them into arraycompound.  
Extract the counts from dataframe and convert them into array.  
Initialize and send the data through REST API.

In [None]:
def send_user_mentions_df_to_dashboard(df):
    top_users = [str(t.mention) for t in df.select("mention").collect()]
    users_count = [p.mention_count for p in df.select("mention_count").collect()]
    url = 'http://localhost:5001/mentions/updateData'
    request_data = {'label': str(top_users), 'data': str(users_count)}
    response = requests.post(url, data=request_data)

#### Process RDDs for Mentioned Users
Get spark sql singleton context from the current context.  
Convert the RDD to Row RDD.  
Create a DF from the Row RDD.  
Register the dataframe as table.  
Get the top 10 hashtags from the table using SQL.
Call this method to prepare comp mean DF and send it.

In [None]:
def process_rdd_mentions(time, rdd):
    print("MENTION----------- %s -----------" % str(time))
    try:
        sql_context = get_sql_context_instance(rdd.context)
        row_rdd = rdd.map(lambda w: Row(mention=w[0], mention_count=w[1]))
        mentions_df = sql_context.createDataFrame(row_rdd)
        mentions_df.registerTempTable("mentions")
        mentions_count_df = sql_context.sql("select mention, mention_count from mentions order by mention_count desc limit 20")
        send_user_mentions_df_to_dashboard(mentions_count_df)
    except:
        e = sys.exc_info()[0]
        print("Error: %s" % e)

## Spark Structured Streaming

#### Setting Up Spark
Create spark configurations.  
Create spark context with the above configuration.  
Create the Streaming Context from the above spark context with interval size 2 seconds.  
Setting a checkpoint to allow RDD recovery.  
Read data from port 5555.

In [None]:
conf = SparkConf()
conf.setAppName("TwitterStreamApp")    # 
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint_TwitterApp")
dataStream = ssc.socketTextStream("0.0.0.0",5555)

#### Initial Parsing of Incoming Data Stream
Split each line into tweets.  
Split each tweet into words.  
Filter the words to get only hashtags/mentions, then map each hashtag/mention to be a pair of (hashtag/mention,1).

In [None]:
tweets = dataStream.flatMap(lambda line: line.split("t_end")) 
words = dataStream.flatMap(lambda text: text.split(" "))

hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x.upper(), 1))
mentions = words.filter(lambda w: '@' in w).map(lambda x: (x.upper(), 1))
sentiment = tweets.filter(lambda w: ' ' in w).map(lambda x: (x, 0.0))

#### Aggregate Date Count
Adding the count of each hashtag/mention/sentiment to its previous count

In [None]:
hashtags_totals = hashtags.updateStateByKey(aggregate_data_count)
mentions_totals = mentions.updateStateByKey(aggregate_data_count)
sentiment_totals = sentiment.updateStateByKey(aggregate_data_count)

#### Process RDDs
Do processing for each RDD generated in each interval.  

In [None]:
hashtags_totals.foreachRDD(process_rdd_hashtags)
mentions_totals.foreachRDD(process_rdd_mentions)
sentiment_totals.foreachRDD(process_rdd_sentiment)

#### Start Streaming
Start the streaming computation.  
Wait for the streaming to finish.

In [None]:
try:
    ssc.start()
    ssc.awaitTermination()
except KeyboardInterrupt:
    ssc.stop()
    print('Interrupted')