In [None]:
!pip install requests requests_oauthlib
!pip install pyspark-utils
!pip install pypandoc
!pip install Twitter.utils

In [None]:
from __future__ import print_function
from pyspark import SparkContext
from  pyspark.streaming import StreamingContext
import sys
from pyspark.sql import SQLContext

In [None]:
sc = SparkContext(appName="StreamingDemo")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 10)

In [None]:
socket_stream = ssc.socketTextStream("127.0.0.1", 9000)

In [None]:
lines = socket_stream.window(60)
hashtags = lines.flatMap( lambda text: text.split( " " )).filter( lambda word: word.lower().startswith("#")).map( lambda word: (word.lower(), 1)).reduceByKey( lambda a,b: a+b) 
author_counts_sorted_dstream = hashtags.transform( lambda foo: foo.sortBy(lambda x:x[0].lower()).sortBy(lambda x: x[1], ascending=False))   
author_counts_sorted_dstream.pprint()

ssc.start()

ssc.awaitTermination()


In [None]:
def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

In [None]:
def get_sql_context_instance(spark_context):
    if('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
        
    return globals()['sqlContextSingletonInstance']

def process_rdd(time, rdd):
	print("----------- %s -----------" % str(time))
	try:
    	# Get spark sql singleton context from the current context
        sql_context = get_sql_context_instance(rdd.context)
    	# convert the RDD to Row RDD
    	row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
    	# create a DF from the Row RDD
    	hashtags_df = sql_context.createDataFrame(row_rdd)
    	# Register the dataframe as table
    	hashtags_df.registerTempTable("hashtags")
    	# get the top 10 hashtags from the table using SQL and print them
    	hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
    	hashtag_counts_df.show()
    	# call this method to prepare top 10 hashtags DF and send them
    	send_df_to_dashboard(hashtag_counts_df)
	except:
    	e = sys.exc_info()[0]
    	print("Error: %s" % e)

In [None]:
def send_df_to_dashboard(df):
	# extract the hashtags from dataframe and convert them into array
	top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
	# extract the counts from dataframe and convert them into array
	tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
	# initialize and send the data through REST API
	url = 'http://localhost:5001/updateData'
	request_data = {'label': str(top_tags), 'data': str(tags_count)}
	response = requests.post(url, data=request_data)

In [None]:
# split each tweet into words
words = socket_stream.flatMap(lambda line: line.split(" "))
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()