In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
#from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
#from pyspark.mllib.linalg import Vectors
#from pyspark.mllib.regression import LabeledPoint
#from pyspark.mllib.feature import HashingTF
#from pyspark.mllib.feature import IDF

import socket
import time
import datetime
import pickle
from collections import namedtuple
import urllib.request
import json
import pandas as pd
from pymongo import MongoClient


def tweet_count(lines):
    tweet_cnt = lines.flatMap(lambda line: line.split('---')).map(lambda line: 1).reduce(lambda x,y: x+y)
    tweet_cnt.foreachRDD(lambda x: tweet_cnt_li.extend(x.collect()))

def user_count(lines):
    user_cnt = lines.flatMap(lambda line: line.split('---')).map(lambda line: line[:line.find('+++')]) 
    user_cnt.foreachRDD(lambda x: user_cnt_li.extend(x.distinct().collect()))

def related_hashtags(lines):
    hashtags = lines.flatMap(lambda line: line.split(" ")).filter(lambda word: word.startswith('#')).map(lambda word: (word.lower(), 1)).reduceByKey(lambda x, y: x+y)

    hashtags.foreachRDD(lambda x: x.toDF(['Hashtag', 'Count']).sort('Count').limit(100).registerTempTable("related_hashtags_tmp"))


def classify_tweet(tf):
    return IDF().fit(tf).transform(tf)


def data_to_db(db, start_time, hashtags, tracking_word):
    
    # 5) Store hashtags
    collection = db['hashtags']
    db['hashtags'].insert_many(hashtags)
    
     # 7) Store tracking_word
    tracking_word_df = pd.DataFrame([tracking_word], columns=['Tracking_word'])
    tracking_word_js = json.loads(tracking_word_df.reset_index().to_json(orient='records'))
    collection = db['tracking_word']
    db['tracking_word'].insert_many(tracking_word_js)

def main(sc, db, tracking_word):

    print('>'*30+'SPARK START'+'>'*30)

    sqlContext = SQLContext(sc)
    ssc = StreamingContext(sc, batch_interval)

    # Receive the tweets
    host = socket.gethostbyname(socket.gethostname())
    print("In MAIN hostname:", host)
    #host = "127.0.0.1"
    # Create a DStream that represents streaming data from TCP source
    socket_stream = ssc.socketTextStream(host, 5555)
    lines = socket_stream.window(window_time)
    
    print("Printing lines")
    lines.pprint()

    # Construct tables
    tmp = [('none', 0)]
    related_hashtags_df = sqlContext.createDataFrame(tmp, ['Hashtag', 'Count'])
    
    #Find the related hashtags
    related_hashtags(lines)

    # Start the streaming process
    ssc.start()

    
    process_cnt = 0
    
    start_time = [datetime.datetime.now()]

    while process_cnt < process_times:
        time.sleep(window_time)
        start_time.append(datetime.datetime.now())
            
        # Find the top related hashtags
        if len(sqlContext.tables().filter("tableName LIKE 'related_hashtags_tmp'").collect()) == 1:
            print("inside table exist")
            top_hashtags = sqlContext.sql( 'Select Hashtag, Count from related_hashtags_tmp' )
            related_hashtags_df = related_hashtags_df.unionAll(top_hashtags)

        process_cnt += 1
        print("after increment cnt ", process_cnt)

    # Spark SQL to Pandas Dataframe
    related_hashtags_pd = related_hashtags_df.toPandas()
    related_hashtags_pd = related_hashtags_pd[related_hashtags_pd['Hashtag'] != '#'+tracking_word] 
    related_hashtags_pd = related_hashtags_pd.groupby(related_hashtags_pd['Hashtag']).sum()
    related_hashtags_pd = pd.DataFrame(related_hashtags_pd)
    related_hashtags_pd = related_hashtags_pd.sort_values(by="Count", ascending=0).iloc[0:min(10, related_hashtags_pd.shape[0])]

    print("reached to end")
    sc.stop()
    ssc.stop()
    ###########################################################################

    print(related_hashtags_pd.head(10))

    related_hashtags_js = json.loads(related_hashtags_pd.reset_index().to_json(orient='records'))
    print(related_hashtags_js)

    # Store the data to MongoDB
    data_to_db(db, start_time, related_hashtags_js, tracking_word)
    
    print('>'*30+'SPARK STOP'+'>'*30)
    
    
    
if __name__=="__main__":
    # Define Spark configuration
    conf = SparkConf()
    conf.setMaster("local[4]")
    conf.setAppName("Twitter-Hashtag-Tracking")
    # Initialize a SparkContext
    sc = SparkContext.getOrCreate(conf=conf)
    # Initialize sparksql context
    # Will be used to query the trends from the result.
    sqlContext = SQLContext(sc)
    
    # Load parameters
    with open('parameters.json') as f:
        p = json.load(f)
        tracking_word = p['keyword']
        batch_interval = int(p['DStream']['batch_interval'])
        window_time = int(p['DStream']['window_time'])
        process_times = int(p['DStream']['process_times'])
        
    # Initialize the tweet_cnt_li
    tracking_word = "Trump"
    batch_interval = 60 
    window_time = 60
    process_times = 5
    print("am here in starting point")
    # Compute the whole time for displaying
    total_time = batch_interval * process_times
    print("total time", total_time)
    # Connect to the running mongod instance
    conn = MongoClient()
    # Switch to the database
    db = conn['twitter']
    db['hashtags'].drop()
    db['counts'].drop()
    db['time'].drop()
    db['ratio'].drop()
    db['tracking_word'].drop()
    db['users'].drop()
    db['keywords'].drop()
    
    main(sc, db, tracking_word)



am here in starting point
total time 300
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>SPARK START>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
In MAIN hostname: 192.168.56.1
Printing lines
-------------------------------------------
Time: 2020-04-20 15:26:00
-------------------------------------------
---FRESH TAKE+++RT @AmysGotBirds: Everyone's blaming Trump for mishandling COVID-19, but that's not fair! He's also raped the environment, stacked the cour…---Rachel Mendoza+++RT @ZoetKat: @45rapedkatie7 Is Fox turning against Trump?---Republican Cover Up!+++RT @JohnLukeSam1: @PressSec 761,991 Americans have the virus and 40,724 are dead. That's the bottom line! That's where the shit hits the fa…---Kyf🤨💭+++RT @NicolleDWallace: Unbelievable political ad - and beyond the political potency, if we had a real GOP this would be a governing scandal---Renee Sandomenico+++@katiecouric Shut up Katie the America people want to hear him and you are irrelevant and a Democrat anti Trump mouth piece!---LeAnna Ritter+++RT @kyledchene

inside table exist
after increment cnt  3
inside table exist
after increment cnt  4
inside table exist
after increment cnt  5
-------------------------------------------
Time: 2020-04-20 15:28:00
-------------------------------------------
Trump Administration ensured America's Truckers could co…---Diane+++RT @_WilliamsonBen: Brazen hypocrisy here—in the 2 weeks before March 16 when POTUS enacted “slow the spread” policies, Biden held 8 rallie…---Bronwyn+++RT @JackFinnResists: Has anyone figured out why Trump said he had hundreds of state governors calling him but we only have 50? 

No? okay..…---TheAwkwardRepublican+++RT @Chris39962442: #OilPrice #OilPrices 

HEY #Trump’s, after bragging about all the great oil deals he made with Putin of Russia, MBS of S…---Janet Byers Manning+++RT @CaslerNoel: The Trump Administration thought Larry Kudlow’s wife was more deserving of financial relief than the US Postal Service. @re…---david henderson+++RT @EpochTimes: “We need to know exactly what d