In [None]:
# Importing all the libraries
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
import pandas as pd
from textblob import TextBlob
import pprint
import re
import json
import string
from datetime import datetime
import numpy as np

In [None]:
# Libraries for display
import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
from threading import Timer,Thread,Event
# Only works for Jupyter Notebooks!
%matplotlib inline

In [None]:
# Set Spark Context
sc=SparkContext(appName='TwitterAnalysis')
sc.setLogLevel("ERROR")
ssc =StreamingContext(sc,30)

In [None]:
# Set Up checkpoint and get other context
ssc.checkpoint("checkpoint")
socket_stream =ssc.socketTextStream('172.31.38.183',7777)
sqlContext = SQLContext(sc)


In [None]:
# Create named tuples
from collections import namedtuple
# Tweet
fields = ("user", "count" )
Tweet = namedtuple( 'Tweet', fields )
# Trend
fields_1 = ("trends","count")
Trend = namedtuple('Trend', fields_1)
# Sentiment
fields_2 = ("sentiment", "count" )
Sentiment =namedtuple('Sentiment',fields_2)
# Language
fields_3 = ("language", "count" )
Language =namedtuple('Language',fields_3)
# Geo
fields_4 =("geo","count" )
Geo = namedtuple('Geo',fields_4)

In [None]:
# Get the json object
def get_json(myjson): 
    try:
        json_object = json.loads(myjson)
    except ValueError, e:
        return False
    return json_object
    

In [None]:
# get the coordinates:
def get_coord2(post):
    coord = tuple()
    try:
        if post['coordinates'] == None:
            coord = post['place']['bounding_box']['coordinates']
            coord = reduce(lambda agg, nxt: [agg[0] + nxt[0], agg[1] + nxt[1]], coord[0])
            coord = tuple(map(lambda t: t / 4.0, coord))
        else:
            coord = tuple(post['coordinates']['coordinates'])
    except TypeError:
        coord=(0,0)
    return coord

In [None]:
# Return the updated count
def updateTotalCount(currentCount, countState):
    if countState is None:
        countState = 0
    return sum(currentCount, countState)

In [None]:
# Global Variable Declaration

# Current Time
t0 = datetime.now()
# Removing punctuation
remove_spl_char_regex = re.compile('[%s]' % re.escape(string.punctuation)) # regex to remove special characters
# To remove stopwords
stopwords=[u'rt', u'que',u'amp',u'get',u're', u'i', u'me', u'my', u'myself', u'we', u'our', u'ours', u'ourselves', u'you', u'your', u'yours', u'yourself', u'yourselves', u'he', u'him', u'his', u'himself', u'she', u'her', u'hers', u'herself', u'it', u'its', u'itself', u'they', u'them', u'their', u'theirs', u'themselves', u'what', u'which', u'who', u'whom', u'this', u'that', u'these', u'those', u'am', u'is', u'are', u'was', u'were', u'be', u'been', u'being', u'have', u'has', u'had', u'having', u'do', u'does', u'did', u'doing', u'a', u'an', u'the', u'and', u'but', u'if', u'or', u'because', u'as', u'until', u'while', u'of', u'at', u'by', u'for', u'with', u'about', u'against', u'between', u'into', u'through', u'during', u'before', u'after', u'above', u'below', u'to', u'from', u'up', u'down', u'in', u'out', u'on', u'off', u'over', u'under', u'again', u'further', u'then', u'once', u'here', u'there', u'when', u'where', u'why', u'how', u'all', u'any', u'both', u'each', u'few', u'more', u'most', u'other', u'some', u'such', u'no', u'nor', u'not', u'only', u'own', u'same', u'so', u'than', u'too', u'very', u's', u't', u'can', u'will', u'just', u'don', u'should', u'now']
# Dictionary for languages
lang_dict = {'en':'English', 'pt': 'Portuguese','de':'German','es':'Spanish','nl':'Dutch','ru':'Russia','ja':'Japanesse','zh-cn':'Chinese','ko':'Koreans','hi':'Hindi','fr':'French','cs':'Czech','ur':'Urdu','und':'Afgani','tr':'Turkish','it':'Italian','ar':'Arabic','el':'Greek','da':'Danish','fa':'Persian','pl':'Polish','ro':'Romanian','sv':'Swedish','th':'Thai','uk':'Ukrainian','bn':'Bengali','fi':'Finnish','fil':'Filipino','hu':'Hungarian','id':'Indonesian','vi':'Vietnamese','zh-tw':'china','msa':'Malay','he':'Hebrew','no':'Norwegian'}

In [None]:
#Return language
def lang(lang):
    try: 
# return the key value
        return(lang_dict[lang])
    except KeyError as e:
        return " "

In [None]:
# Return the country
# Spaces will be Omiited E.g. United States will become UnitedStates
# Any country where the total length of words > 2 , only initials will be returned
#  E.g. Republic of China = ROC , Republic of Phillippines  = ROP
def country(country):
    new_country = ' '
    country = country.encode('ascii', 'ignore')
    words = country.split()
    if len(words) > 2:
        for word in words:
            new_country  =  new_country + str(word[0])
        country =new_country
    country=country.replace(" ", "")
    return(country)




In [None]:
# Return word token
def tokenize(text):
    tokens = []
    text = text.encode('ascii', 'ignore') #to decode
# remove special characters and urls with ''
    text=re.sub('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*(),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
    text = remove_spl_char_regex.sub(" ",text)  # Remove special characters
    text=text.lower()

    for word in text.split():
# Remove Stop Words
#take only word greater than 2 
        if word not in stopwords \
            and word not in string.punctuation \
            and len(word)> 2 \
            and word != '``':
                tokens.append(word)
    return tokens
 

In [None]:
# Get Tweet polarity
def get_tweet_sentiment(tokens):
        '''
        Utility function to classify sentiment of passed tweet
        using textblob's sentiment method
        '''
        text = ' '.join(str(e) for e in tokens)
        # create TextBlob object of passed tweet text
        analysis = TextBlob(text)
        # set sentiment
        if analysis.sentiment.polarity > 0:
            return 'positive'
        elif analysis.sentiment.polarity == 0:
            return 'neutral'
        else:
            return 'negative'
 

In [None]:
# Get the streaming data into a Dstream
lines = socket_stream.window(30)

# Main dstreams
dstream_tweets=(lines.map(lambda post: get_json(post)).filter(lambda post: post != False)  # return json 
            .filter(lambda post: 'created_at' in post)   # see if it is valid 
               .map(lambda post: (post["user"]["screen_name"],post["user"]["followers_count"],post["lang"],post["user"]["statuses_count"],post["text"])) 
           .filter(lambda tpl: tpl[4] != ' ').filter(lambda tpl:tpl[2] != ' ') ) # Check text and language are not spaces

tokenized_tweets = (dstream_tweets.map(lambda tpl: (tpl[4],tokenize(tpl[4])))  # Get tokenized list 
              .map(lambda tpl:(tpl[0],tpl[1],get_tweet_sentiment(tpl[1]))))    # Map sentiment against each tweet

              

# Remove below comment if you want to visualize the main dstreams
#dstream_tweets.pprint() 
#tokenized_tweets.pprint()



In [None]:
# Top 5 languages
language=(dstream_tweets.map(lambda x: lang(x[2])).filter(lambda x:x!=' ')   # pick up language and check for spaces
       .map( lambda lan: (lan, 1 )).reduceByKey(lambda a, b: a + b)      # Word Count
       .updateStateByKey(updateTotalCount)                              # Add to the previous count
        .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))   # Sort in descending order
        .map(lambda x:"Popular Language: %s\tCount: %s" % (x[0],x[1]))  )   # for printing

#Remove below comment if you want to visualize the dstream
#language.pprint(5)

In [None]:
# Top 20 influential personalities
influential = (dstream_tweets.map(lambda rec: ( rec[0], rec[1] ))     # pick up user id and follower's count
              .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))  # descending order of follower's count
              .map(lambda x:"Influential People: %s\tFollower's Count: %s" % (x[0],x[1])) )# for printing
        
# Remove comment if you want to visualize the dstream
#influential.pprint(20)



In [None]:
# Top tweeting user

# Note: For most active user, we have treaded a different path. 

# Instead of accumulating user_id over a period of 1 hr, we have chosen to take corresponding statuses count 

# Statuses Count - The number of Tweets (including retweets) issued by the user.

# This will not be the most accurate real time ,but we belive it gives us a better indication of which user is 
# presently tweeting and also quite active

top_tweeting_user = (dstream_tweets.map(lambda x: (x[0], x[3]))     # pick up user_id ,statuses count
                .transform(lambda twu: twu.sortBy(lambda x: x[1], ascending=False))  # descending order of count
                .map(lambda x:"Top Tweeting User: %s\tTweet Count: %s" % (x[0],x[1]))) # for printing

# Remove comment if you want to visualize the dstream
#top_tweeting_user.pprint(50)

In [None]:
# Most popular tweets

# In case retweeted status is not null, then the tweet is retweeted. if it is true, it contains the original text 
# and the retweet count

popular_tweets = (lines.map(lambda post: get_json(post)).filter(lambda post: post != False)  # get json 
                 .filter(lambda post: 'created_at' in post)   # take only valid and complete tweet
                 .filter(lambda post:'retweeted_status' in post )  # Check if retweeted status is not null
                 .map(lambda post:(post["text"],post["retweeted_status"]["retweet_count"]))  # get tweet and count
                 .filter(lambda post: post[1] > 0)   # make sure the retweet count is greater than zero
                 .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))   # descending order of count
                .map(lambda x:"Most Retweeted: %s\tRetweet Count: %s" % (x[0],x[1])) ) # for printing

# Remove comment if you want to visualize the dstream
#popular_tweets.pprint(100)

In [None]:
# Top 5 words

# we already have a tokenized text, so picking out the most used word 
# Note: We are not accumulating the counts from the previous states, so no updateStateByKey is used
words= (tokenized_tweets.flatMap(lambda x: x[1]).map( lambda word: (word.lower(), 1 )).reduceByKey(lambda a, b: a + b) \
       .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)) 
       .map(lambda x:"Top Word: %s\tCount: %s" % (x[0],x[1])))

# Remove comment if you want to visualize the dstream
#words.pprint(5)

In [None]:
# Top 10 trends

# look for hashtags

trends = (dstream_tweets.flatMap(lambda x: x[4].split(" ")).filter( lambda word: word.lower().startswith("#"))  #hashtag
        .map( lambda word: ( word.lower(), 1)).reduceByKeyAndWindow( lambda a, b: a + b ,60,30)  # reducing it over the window
     .updateStateByKey(updateTotalCount)   # add to the previous count
    .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))   # descending order of count
    .map(lambda x:"Top Trend: %s\tCount: %s" % (x[0],x[1])) ) # for printing

# Remove comment if you want to visualize the dstream
#trends.pprint(10)

In [None]:
# Tweet Overall Sentiments
# already available under tokenized tweets
sentiments = (tokenized_tweets.map(lambda x : (x[2],1))    # take the sentiments and do a wordcount
            .reduceByKey(lambda a, b: a + b)  # three kays ( Neutral, Positive,Negative)
            .updateStateByKey(updateTotalCount)   # add to previous count
            .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))   # descending order of count
            .map(lambda x:"Top Sentiment: %s\tCount: %s" % (x[0],x[1])) )  # for printing
        
# Remove comment if you want to visualize the dstream
#sentiments.pprint()




In [None]:

# Top 7 countries 
# Please note that most (90%) of the times geo data is not available, so we are working on the ones which are provided

# The above indicates that we are discarding 90% of tweets, so this is not real representation , hence it is treated 

# as seperate and not derived from any many dstream

country_tweets  = lines.map(lambda post: get_json(post)).filter(lambda post: post != False) \
                .filter(lambda post: 'created_at' in post) \
                .filter(lambda post : post["place"]!= None) \
                .map(lambda post: country(post["place"]["country"])) \
                .filter(lambda con: con!='') \
                .map( lambda con: (con, 1 )).reduceByKey(lambda a, b: a + b) \
                .updateStateByKey(updateTotalCount) \
                .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)) 
                

# Remove comment if you want to visualize the dstream
# country_tweets.pprint(7)

In [None]:
# Coordinates 
coordinates =lines.map(lambda post: get_json(post)).filter(lambda post: post != False) \
                .filter(lambda post: 'created_at' in post) \
                .filter(lambda post : post["place"]!= None) \
                .map(lambda post: (get_coord2(post)[0],get_coord2(post)[1])) \
                        
            
# Remove comment if you want to visualize the dstream            
#coordinates.pprint()

In [None]:
# First pick the tuple items
# Get the values in DF
# Register that as temp table

# Tweets table
(dstream_tweets.map(lambda rec: Tweet( rec[0], rec[1] ) ) 
  .foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") ) 
  .limit(10).registerTempTable("tweets") ) ) 

# Trends table
(dstream_tweets.flatMap(lambda x: x[4].split(" ")).filter( lambda word: word.lower().startswith("#")) 
        .map( lambda word: ( word.lower(), 1)).reduceByKeyAndWindow( lambda a, b: a + b ,60,30) 
     .updateStateByKey(updateTotalCount) 
    .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)) 
    .map(lambda rec: Trend( rec[0], rec[1] ) )
    .foreachRDD( lambda rdd: rdd.toDF().limit(10).registerTempTable("trends") ) )

# Top Users
(dstream_tweets.map(lambda x: (x[0], x[3])).transform(lambda twu: twu.sortBy(lambda x: x[1], ascending=False))
 .map(lambda rec: Tweet( rec[0], rec[1] ) ).foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") )
 .limit(10).registerTempTable("topUsers") ))

# Sentiments
(tokenized_tweets.map(lambda x : (x[2],1)).reduceByKey(lambda a, b: a + b) 
 .updateStateByKey(updateTotalCount).transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
 .map(lambda rec: Sentiment( rec[0], rec[1] ) )
 .foreachRDD( lambda rdd: rdd.toDF().registerTempTable("sentiments") ) )

# Language
(dstream_tweets.map(lambda x: lang(x[2])).filter(lambda x:x!=' ') 
 .map( lambda lan: (lan, 1 )).reduceByKey(lambda a, b: a + b) 
 .updateStateByKey(updateTotalCount) 
 .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)) 
 .map(lambda rec: Language( rec[0], rec[1] ) )
 .foreachRDD( lambda rdd: rdd.toDF().limit(5).registerTempTable("languages") ) )

# Geo 
(country_tweets.map(lambda rec: Geo( rec[0], rec[1] ) )
.foreachRDD( lambda rdd: rdd.toDF().limit(5).registerTempTable("geos") ) )



In [None]:
# Aggregated Dstream
# Contains only first element of each dstream's rdd
Aggregated =(language.map(lambda x:x).transform(lambda rdd :rdd.zipWithIndex()).filter(lambda x: x[1]==0) \
            .map(lambda x:x[0])\
            .union(influential.map(lambda x: x).transform(lambda rdd :rdd.zipWithIndex()).filter(lambda x: x[1]==0) \
            .map(lambda x:x[0]) \
            .union(top_tweeting_user.map(lambda x: x).transform(lambda rdd :rdd.zipWithIndex()).filter(lambda x: x[1]==0) \
            .map(lambda x:x[0]) \
            .union(words.map(lambda x: x).transform(lambda rdd :rdd.zipWithIndex()).filter(lambda x: x[1]==0) \
            .map(lambda x:x[0]) \
            .union(trends.map(lambda x: x).transform(lambda rdd :rdd.zipWithIndex()).filter(lambda x: x[1]==0) \
            .map(lambda x:x[0]) \
            .union(sentiments.map(lambda x: x).transform(lambda rdd :rdd.zipWithIndex()).filter(lambda x: x[1]==0) \
            .map(lambda x:x[0]) \
            .union(country_tweets.map(lambda x: x).transform(lambda rdd :rdd.zipWithIndex()).filter(lambda x: x[1]==0) \
            .map(lambda x:x[0]).map(lambda x:"Top Geo: %s\tTweet Volume: %s" % (x[0],x[1]))
            .union(popular_tweets.map(lambda x: x).transform(lambda rdd :rdd.zipWithIndex()).filter(lambda x: x[1]==0) \
            .map(lambda x:x[0])))))))) )

# Comment this out in case printing any other dstream
Aggregated.pprint() 


In [None]:
# Class for eternal thread class
class perpetualTimer():   

    def __init__(self,t,hFunction):
    
        self.t=t
        self.hFunction = hFunction
        self.thread = Timer(self.t,self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t,self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()



In [None]:
# Create the graph dynamically

# The graph run on back of a thread and will refresh itself automatically

# 6 barplots in total
def createGraph():
    
    global t0
    total_mins = 60
    diff = (datetime.now() - t0)
    mins =int(diff.total_seconds()/60)
    # Running for an hour
    if mins < total_mins: 
        try:
    # Graphing 
            top_10_tweets = sqlContext.sql( 'Select user, count from tweets' )
            top_10_df = top_10_tweets.toPandas()
            top_10_trends = sqlContext.sql( 'Select trends, count from trends' )
            top_10_trends_df = top_10_trends.toPandas()
            top_10_users = sqlContext.sql( 'Select user, count from topUsers' )
            top_10_users_df = top_10_users.toPandas()
            top_sentiment =sqlContext.sql('Select sentiment,count from sentiments')
            top_sentiment_df = top_sentiment.toPandas()
            top_language =sqlContext.sql('Select language,count from languages')
            top_language_df =top_language.toPandas()
            top_geo =sqlContext.sql('Select geo,count from geos')
            top_geo_df =top_geo.toPandas()
            display.clear_output(wait=True)
            fig,ax =sns.plt.subplots(figsize=(15,15),nrows=3,ncols=2)
            sns.barplot( x="count", y="user", data=top_10_df,ax=ax[0][0]).set_title('User vs Followers Count', fontsize=10,color="r",alpha=0.5)
            sns.barplot( x="count", y="trends", data=top_10_trends_df,ax=ax[2][0]).set_title('Twitter Trend Plot', fontsize=10,color="r",alpha=0.5)
            sns.barplot( x="count", y="user", data=top_10_users_df,ax=ax[1][0]).set_title('User vs Tweets Count', fontsize=10,color="r",alpha=0.5)
            sns.barplot( x="sentiment", y="count", data=top_sentiment_df,ax=ax[0][1]).set_title('Tweets Sentiment', fontsize=10,color="r",alpha=0.5)
            sns.barplot( x="language", y="count", data=top_language_df,ax=ax[1][1]).set_title('Most Tweeted Language', fontsize=10,color="r",alpha=0.5)
            sns.barplot( x="geo", y="count", data=top_geo_df,ax=ax[2][1]).set_title('Tweeting Geos', fontsize=10,color="r",alpha=0.5)  
            sns.plt.show()
        except:
            pass
    else:
        stop_streaming()
    return True

In [None]:
# Start Streaming and plotting
ssc.start()
# Start the graph function after 40 secs
t =perpetualTimer(40,createGraph)
t.start()

In [None]:
# Stopping the stream
def stop_streaming():
        t.cancel()
        ssc.stop()
        print("Hard Stop")
        