In [6]:
"""
This module is the spark streaming analysis process.


Usage:
    If used with dataproc:
        gcloud dataproc jobs submit pyspark --cluster <Cluster Name> twitterHTTPClient.py

    Create a dataset in BigQurey first using
        bq mk bigdata_sparkStreaming

    Remeber to replace the bucket with your own bucket name


Todo:
    1. hashtagCount: calculate accumulated hashtags count
    2. wordCount: calculate word count every 60 seconds
        the word you should track is listed below.
    3. save the result to google BigQuery

"""

import subprocess
import time

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext

# global variables
bucket = 'surbhi-mis586-asgnmt4'  # TODO: here, replace with your own bucket name
output_directory_hashtags = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/hashtagsCount'.format(
    bucket)
output_directory_wordcount = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/wordcount'.format(
    bucket)

# output table and columns name
output_dataset = 'dataset1'  #TODO: the name of your dataset in BigQuery
    # (Create a BigQuery dataset first using bq mk <your dataset name> in Google Cloud SDK Shell or any alternative approaches.)
output_table_hashtags = 'hashtags'
columns_name_hashtags = ['hashtags', 'count']
output_table_wordcount = 'wordcount'
columns_name_wordcount = ['word', 'count', 'time']

# parameter
IP = 'localhost'  # ip port
PORT = 9001  # port

STREAMTIME = 600  # time that the streaming process runs
#STREAMTIME = 20  # for test

WORD = ['data', 'spark', 'ai', 'movie', 'coronavirus']  # the words you should filter and do word count


# Helper functions
def saveToStorage(rdd, output_directory, columns_name, mode):
    """
    Save each RDD in this DStream to google storage
    Args:
        rdd: input rdd
        output_directory: output directory in google storage
        columns_name: columns name of dataframe
        mode: mode = "overwirte", overwirte the file
              mode = "append", append data to the end of file
    """
    if not rdd.isEmpty():
        (rdd.toDF(columns_name)
         .write.save(output_directory, format="json", mode=mode))


def saveToBigQuery(sc, output_dataset, output_table, directory):
    """
    Put temp streaming json files in google storage to google BigQuery
    and clean the output files in google storage
    """
    files = directory + '/part-*'
    subprocess.check_call(
        'bq load --source_format NEWLINE_DELIMITED_JSON '
        '--replace '
        '--autodetect '
        '{dataset}.{table} {files}'.format(
            dataset=output_dataset, table=output_table, files=files
        ).split())
    output_path = sc._jvm.org.apache.hadoop.fs.Path(directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
        output_path, True)


def hashtagCount(words):
    """
    Calculate the accumulated hashtags count sum from the beginning of the stream
    and sort it by descending order of the count.
    Ignore case sensitivity when counting the hashtags:
        "#Ab" and "#ab" are considered to be a same hashtag
    You have to:
    1. Filter out the word that is hashtags.
       Hashtag usually start with "#" and followed by a serious of alphanumeric
    2. map (hashtag) to (hashtag, 1)
    3. sum the count of current DStream state and previous state
    4. transform unordered DStream to a ordered Dstream
    Hints:
        You may use regular expression to filter the words
        You can take a look at updateStateByKey and transform transformations
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (hashtag, count)
    """
    
    # TODO: insert your code here
    # 0. Define a function called "updateFunc" compting running sum of hashtag counts:
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0)
    
    # 1. Define hashtag using the following techniques:
    #    1.1. "#Ab" and "#ab" are the same: hashtag = words.map(lambda x: x.lower())
    #    1.2. Filter out hashtags:  .filter(lambda x: len(x) > 2 and x[0] == "#")
    #    1.3. map (hashtag) to (hashtag, 1): .map(lambda x: (x, 1))
    hashtag = words.map(lambda x: x.lower()).filter(lambda x: len(x) > 2 and x[0] == "#").map(lambda x: (x, 1))
    # 2. Define hashtag count using reduceByKey
    hashtag_cnt = hashtag.reduceByKey(lambda cnt1, cnt2: cnt1 + cnt2)
    # 3. Update total hashtag count using updateStateByKey and the updateFunc defined above:
    hashtag_cnt_total = hashtag_cnt.updateStateByKey(updateFunc)
    # 4. Sort the hashtag counts using transform transformation:
    #       .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
    hashtag_cnt_total = hashtag_cnt.updateStateByKey(updateFunc).transform(
        lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
    # 5. Return total hashtag count:
    return hashtag_cnt_total


def wordCount(words):
    """
    Calculte the count of 5 sepcial words for every 60 seconds (window no overlap)
    You can choose your own words.
    Your should:
    1. filter the words
    2. count the word during a special window size
    3. add a time related mark to the output of each window, ex: a datetime type
    Hints:
        You can take a look at reduceByKeyAndWindow transformation
        Dstream is a serious of rdd, each RDD in a DStream contains data from a certain interval
        You may want to take a look of transform transformation of DStream when trying to add a time
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (word, count, time)
    """
    
    # TODO: insert your code here
    # 1. Define word_cnt using the following techniques:
    #    1.1. "#Ab" and "#ab" are the same: word_cnt = words.map(lambda x: x.lower())
    #    1.2. Filter the words we want: .filter(lambda word: word in WORD)
    #    1.3. map (word) to (word, 1): .map(lambda x: (x, 1))
    #    1.4. Use reduceByKeyAndWindow transformation to count the word during a special window size: 
    #           .reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 60, 60)
    word_cnt = words.map(lambda x: x.lower()).filter(lambda word: word in WORD).map(
        lambda x: (x, 1)).reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 60, 60)
    # 2. Use transform and map transformations to update total word count by adding 
    #    date and time related mark to the output:
    word_cnt_total = word_cnt.transform(lambda time, rdd: rdd.map(
        lambda x: (x[0], x[1], time.strftime("%Y-%m-%d %H:%M:%S"))))
    
    return word_cnt_total


if __name__ == '__main__':
    # Spark settings
    conf = SparkConf()
    conf.setMaster('local[2]')
    conf.setAppName("TwitterStreamApp")

    # create spark context with the above configuration
    sc = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")

    # create sql context, used for saving rdd
    sql_context = SQLContext(sc)

    # create the Streaming Context from the above spark context with batch interval size 60 seconds
    ssc = StreamingContext(sc, 60)
    #ssc = StreamingContext(sc, 5)
    # setting a checkpoint to allow RDD recovery
    ssc.checkpoint("~/checkpoint_TwitterApp")

    # read data from port 9001
    dataStream = ssc.socketTextStream(IP, PORT)
    dataStream.pprint()

    words = dataStream.flatMap(lambda line: line.split(" "))

    # calculate the accumulated hashtags count sum from the beginning of the stream
    topTags = hashtagCount(words)
    topTags.pprint()

    # calculte the word count during each time period 6s
    wordCount = wordCount(words)
    wordCount.pprint()

    """
    save hashtags count and word count to google storage
    used to save to google BigQuery
    You should:
      1. topTags: only save the lastest rdd in DStream
      2. wordCount: save each rdd in DStream
    Hints:
      1. You can take a look at foreachRDD transformation
      2. You may want to use helper function saveToStorage
      3. You should use save output to output_directory_hashtags, output_directory_wordcount,
         and have output columns name columns_name_hashtags and columns_name_wordcount.    
    """
    # TODO: insert your code here
    # 0. Use foreachRDD transformation and saveToStorage helper function to 
    #    save hashtags count to google storage: 
    topTags.foreachRDD(lambda rdd: saveToStorage(rdd, output_directory_hashtags, 
                                                 columns_name_hashtags, mode="overwrite"))
    # 1. Use foreachRDD transformation and saveToStorage helper function to 
    #    save word count to google storage:
    wordCount.foreachRDD(lambda rdd: saveToStorage(rdd, output_directory_wordcount,
                                                   columns_name_wordcount, mode="append"))
    
    # start streaming process, wait for 600s and then stop.
    ssc.start()
    time.sleep(STREAMTIME)
    ssc.stop(stopSparkContext=False, stopGraceFully=True)
    print("Running Finished.")
    
    # put the temp result in google storage to google BigQuery
    saveToBigQuery(sc, output_dataset, output_table_hashtags, output_directory_hashtags)
    saveToBigQuery(sc, output_dataset, output_table_wordcount, output_directory_wordcount)
    print("Saved To BigQuery Finished.")


-------------------------------------------
Time: 2020-04-03 05:22:00
-------------------------------------------
RT @fructosepapi: “funny how bitches turn into my fans”WHAT ABOUT EUIWOONG ANG HYEONGSEOPRT @RBReich: Jeff Bezos' LA mansion has:

—3 saunas
—2 guesthouses
—A golf course
—A tennis court
—A "motor court" with gas pumps

His DC m…RT @KamalaHarris: Fifty days ago I attended a Homeland Security committee meeting about the coronavirus—and no current Trump administration…RT @Emily_Baum: NEW: Two months before the novel coronavirus likely began spreading in Wuhan, the Trump administration ended a $200-million…RT @BeVipul: #Corona_Jihad ? Over the world. https://t.co/7MGR662EOVRT @JoyceWhiteVance: This brave man tried to protect his sailors at the cost of his own career. He should be getting a medal not a demotion…Humanity before anything else. #OneWorldOneFight ❤️RT @AskAnshul: This person is now arrested in Maharashtra.

...

-------------------------------------------
Time: 202

-------------------------------------------
Time: 2020-04-03 05:24:00
-------------------------------------------
('movie', 63, '2020-04-03 05:24:00')
('ai', 8, '2020-04-03 05:24:00')
('coronavirus', 277, '2020-04-03 05:24:00')
('data', 12, '2020-04-03 05:24:00')
('spark', 1, '2020-04-03 05:24:00')

-------------------------------------------
Time: 2020-04-03 05:25:00
-------------------------------------------
3/17 1…RT @DougDavidsonYR: God bless your soul.RT @JoyAnnReid: "As companies nationwide look for relief, the Trump Organization has talked with Deutsche Bank and a Florida county about d…RT @vanitaguptaCR: Jared Kushner. The Patron Saint of Unqualified White Men. https://t.co/29X3cacv29RT @maddow: Milwaukee Journal-Sentinel editorial against the (astonishing) plan by Wisconsin to go ahead with its primary next week:

"As c…RT @SafetyPinDaily: The Evil Tucked Into the $2 Trillion Coronavirus Stimulus Bill || Via: Time https://t.co/hkZsCwSJeGRT @nazmaaman: Hi there @aamir_khan I'm

-------------------------------------------
Time: 2020-04-03 05:27:00
-------------------------------------------
('movie', 80, '2020-04-03 05:27:00')
('ai', 13, '2020-04-03 05:27:00')
('coronavirus', 496, '2020-04-03 05:27:00')
('data', 22, '2020-04-03 05:27:00')

-------------------------------------------
Time: 2020-04-03 05:28:00
-------------------------------------------
RT @vanitaguptaCR: Jared Kushner. The Patron Saint of Unqualified White Men. https://t.co/29X3cacv29RT @MRCRUZv3: THAT BITCH CAROLE BASKINRT @RexChapman: In Toronto, several tenants of a building were worried they couldn't pay their rent during the Coronavirus pandemic.

Their…me and krys just watched sharkboy and lava girl it was the worst fucking movie ive ever watchedThe transmission rate for #coronavirus is 33% per day in an urban area without any lockdown or protection.Did a sim… https://t.co/VOig6O2KF3Mayo Clinic starts using autonomous vehicles to deliver coronavirus tests and medical supplies : #analytics

-------------------------------------------
Time: 2020-04-03 05:31:00
-------------------------------------------
RT @tehseenp: The questions that should have been asked in the #ModiVideoMessage . 
#COVID19Pandemic 
The #ModiVirus is MORE dangerous than…Coronavirus Anxiety Is Real: How To Stay Mentally Strong During The COVID-19 Pandemic - Dr. Judson Brewer - London… https://t.co/KYGE2sXcuZBreaking News #danielnewman #marktakano #nancypelosi #kencalvert #riversidecounty #associatedpress #nbc #cbs #pbs… https://t.co/YbMWVcPDLsWhat's Your Age Releasing the Period Of Ayan Movie 😉

#11YearsOfAyanThank youRT The_Real_Fly: The number of commercial real estate borrowers who requested debt relief during the escalation of the coronavirus outbrea…RT @CNN: Washington Gov. Jay Inslee is calling for a national strategy to combat the coronavirus pandemic. 

"This is a national problem. I…RT @RobertDeNiroUS: -Who did this? 😅 #coronavirus https://t.co/kCRaPHuLr5Try it by yourself!
Never forward anythi

-------------------------------------------
Time: 2020-04-03 05:40:00
-------------------------------------------

-------------------------------------------
Time: 2020-04-03 05:41:00
-------------------------------------------

-------------------------------------------
Time: 2020-04-03 05:41:00
-------------------------------------------
('#coronavirus', 669)
('#covid19', 174)
('#coronavirus.', 55)
('#covidー19', 45)
('#ai', 45)
('#china', 35)
('#covid19pandemic', 35)
('#bigdata', 29)
('#selflessservicebydssvolunteers', 29)
('#iot', 29)
...

-------------------------------------------
Time: 2020-04-03 05:41:00
-------------------------------------------

-------------------------------------------
Time: 2020-04-03 05:42:00
-------------------------------------------

-------------------------------------------
Time: 2020-04-03 05:42:00
-------------------------------------------
('#coronavirus', 669)
('#covid19', 174)
('#coronavirus.', 55)
('#covidー19', 45)
('#ai', 45)
('#china', 35

In [5]:
sc.stop()