In [None]:
import pyspark
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
from pyspark.sql import SparkSession
import sys
import requests
import socket
import json
import traceback
import time

In [None]:
def send_df_to_csv(df):
# extract the hashtags from dataframe and convert them into array
    top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
# extract the counts from dataframe and convert them into array
    tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
    #path=
    url = ''
    request_data = {'label': str(top_tags), 'data': str(tags_count)}
    response = requests.post(url, data=request_data)
    df.toPandas().to_csv(r'\twit_file.csv', header=True, mode='a', index=False)

In [None]:
def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']
def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    try:
        # Get spark sql singleton context from the current context
        sql_context = get_sql_context_instance(rdd.context)
        # convert the RDD to Row RDD
        row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
        # create a DF from the Row RDD
        df_hash = sql_context.createDataFrame(row_rdd)
        # Register the dataframe as table
        df_hash.registerTempTable("hashtags")
        # get the top 10 hashtags from the table using SQL and print them
        hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
        hashtag_counts_df.show()
        # call this method to prepare top 10 hashtags DF and send them
        send_df_to_csv(hashtag_counts_df)
    except:
        print(traceback.format_exc())

In [None]:
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStrimApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 60 seconds
ssc = StreamingContext(sc, 720)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_tweet")
dataStream = ssc.socketTextStream("localhost",9009)

In [None]:
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
total_tags = hashtags.reduceByKey(lambda a, b: a + b)
# do processing for each RDD generated in each interval
total_tags.foreachRDD(process_rdd)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTerminationOrTimeout(3000) 