In [1]:
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext

sc = SparkContext("local[2]", "twitterStream")

ssc = StreamingContext(sc, 5)
ssc.checkpoint("checkpoint_TwitterApp")

lines = ssc.socketTextStream("127.0.0.1", 5555)

In [2]:
def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    # Get spark sql singleton context from the current context
    sql_context = get_sql_context_instance(rdd.context)
    # convert the RDD to Row RDD
    row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
    # create a DF from the Row RDD
    hashtags_df = sql_context.createDataFrame(row_rdd)
    # Register the dataframe as table
    hashtags_df.registerTempTable("hashtags")
    # get the top 10 hashtags from the table using SQL and print them
    hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
    hashtag_counts_df.show()
    # call this method to prepare top 10 hashtags DF and send them
    send_df_to_dashboard(hashtag_counts_df)
        
def send_df_to_dashboard(df):
    # extract the hashtags from dataframe and convert them into array
    top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
    # extract the counts from dataframe and convert them into array
    tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
    # initialize and send the data through REST API
    print(tags_count)

In [3]:
# split each tweet into words
words = lines.flatMap(lambda line: line.split(" "))
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()

----------- 2021-04-19 14:51:25 -----------
+---------------+-------------+
|        hashtag|hashtag_count|
+---------------+-------------+
|         #Covid|            1|
|#OxygenCylinder|            1|
|   #HospitalBed|            1|
|        #Nagpur|            1|
+---------------+-------------+

[1, 1, 1, 1]
----------- 2021-04-19 14:51:30 -----------
+---------------+-------------+
|        hashtag|hashtag_count|
+---------------+-------------+
|         #Covid|            1|
|#OxygenCylinder|            1|
|     #raailaxmi|            1|
|    #SINVEurope|            1|
|    #Christmas!|            1|
|            #C…|            1|
|      #vaccine…|            1|
|         #Katni|            1|
|         #Blood|            1|
|    #Remdesivir|            1|
+---------------+-------------+

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
----------- 2021-04-19 14:51:35 -----------
+---------------+-------------+
|        hashtag|hashtag_count|
+---------------+-------------+
|       #COVID19|     

KeyboardInterrupt: 