<h1 align="center">EECS E6893 Big Data Analytics - Homework Assignment 3</h1>
<h2 align="center">Name: Qi Wang</h2>
<h2 align="center">UNI: qw2261</h2>


This module is the spark streaming analysis process.


Usage:
    If used with dataproc:
        gcloud dataproc jobs submit pyspark --cluster <Cluster Name> twitterHTTPClient.py

    Create a dataset in BigQurey first using
        bq mk bigdata_sparkStreaming

    Remeber to replace the bucket with your own bucket name


Todo:
    1. hashtagCount: calculate accumulated hashtags count
    2. wordCount: calculate word count every 60 seconds
        the word you should track is listed below.
    3. save the result to google BigQuery



In [1]:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
import time
import subprocess
import re
from google.cloud import bigquery

In [2]:
# global variables
bucket = "big_data_homework3"    # TODO : replace with your own bucket name
output_directory_hashtags = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/hashtagsCount'.format(bucket)
output_directory_wordcount = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/wordcount'.format(bucket)

In [3]:
# output table and columns name
output_dataset = 'homework0-253123:hw_3'                     #the name of your dataset in BigQuery
output_table_hashtags = 'hashtags'
columns_name_hashtags = ['hashtags', 'count']
output_table_wordcount = 'wordcount'
columns_name_wordcount = ['word', 'count', 'time']

In [4]:
# parameter
IP = 'localhost'    # ip port
PORT = 9001       # port

STREAMTIME = 600          # time that the streaming process runs

WORD = ['data', 'spark', 'ai', 'movie', 'good']     #the words you should filter and do word count

In [5]:
# Helper functions
def saveToStorage(rdd, output_directory, columns_name, mode):
    """
    Save each RDD in this DStream to google storage
    Args:
        rdd: input rdd
        output_directory: output directory in google storage
        columns_name: columns name of dataframe
        mode: mode = "overwirte", overwirte the file
              mode = "append", append data to the end of file
    """
    if not rdd.isEmpty():
        (rdd.toDF( columns_name ) \
        .write.save(output_directory, format="json", mode=mode))

In [6]:
def saveToBigQuery(sc, output_dataset, output_table, directory):
    """
    Put temp streaming json files in google storage to google BigQuery
    and clean the output files in google storage
    """
    files = directory + '/part-*'
    subprocess.check_call(
        'bq load --source_format NEWLINE_DELIMITED_JSON '
        '--replace '
        '--autodetect '
        '{dataset}.{table} {files}'.format(
            dataset=output_dataset, table=output_table, files=files
        ).split())
    output_path = sc._jvm.org.apache.hadoop.fs.Path(directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
        output_path, True)

In [7]:
def computeRunningSum(newValues, runningCount):
    if runningCount == None:
        runningCount = 0
    return sum(newValues) + runningCount

In [8]:
def hashtagCount(words):
    """
    Calculate the accumulated hashtags count sum from the beginning of the stream
    and sort it by descending order of the count.
    Ignore case sensitivity when counting the hashtags:
        "#Ab" and "#ab" is considered to be a same hashtag
    You have to:
    1. Filter out the word that is hashtags.
       Hashtag usually start with "#" and followed by a series of alphanumeric
    2. map (hashtag) to (hashtag, 1)
    3. sum the count of current DStream state and previous state
    4. transform unordered DStream to a ordered Dstream
    Hints:
        you may use regular expression to filter the words
        You can take a look at updateStateByKey and transform transformations
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (hashtag, count)
    """

    # TODO: insert your code here
    stream_hashtag = words.filter(lambda v: len(v) > 1 and v[0] == '#').map(lambda v: (v.lower(), 1))
    return_hashtag = stream_hashtag.updateStateByKey(computeRunningSum).transform(lambda rdd: rdd.sortBy(lambda v: v[1], ascending = False))
    return return_hashtag
    

In [9]:
def wordCount(words):
    """
    Calculte the count of 5 sepcial words in 60 seconds for every 60 seconds (window no overlap)
    Your should:
    1. filter the words, case insensitive.
    2. count the word during a special window size
    3. add a time related mark to the output of each window, ex: a datetime type
    Hints:
        You can take a look at reduceByKeyAndWindow transformation
        Dstream is a series of rdd, each RDD in a DStream contains data from a certain interval
        You may want to take a look of transform transformation of DStream when trying to add a time
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (word, count, time)
    """

    # TODO: insert your code here
    filtered_words = words.map(lambda v: v.lower()).filter(lambda word: word in WORD).map(lambda v: (v, 1))
    return_words = filtered_words.reduceByKeyAndWindow(lambda a, b: a + b, 60, 60).map(lambda v: (v[0], v[1], time.ctime()))
    
    return return_words

In [10]:
def saveToStorageHashTag(rdd):
    saveToStorage(rdd, output_directory_hashtags, columns_name_hashtags, "overwrite")

In [11]:
def saveToStorageWordCount(rdd):
    saveToStorage(rdd, output_directory_wordcount, columns_name_wordcount, "append")

In [12]:
if __name__ == '__main__':
    # Spark settings
    conf = SparkConf()
    conf.setMaster('local[2]')
    conf.setAppName("TwitterStreamApp")

    # create spark context with the above configuration
    sc.stop()
    SparkContext.setSystemProperty('spark.executor.memory', '16g')
    SparkContext.setSystemProperty('spark.driver.memory', '16g')
    sc = SparkContext(conf = conf)
    sc.setLogLevel("ERROR")

    # create sql context, used for saving rdd
    sql_context = SQLContext(sc)

    # create the Streaming Context from the above spark context with batch interval size 60 seconds
    ssc = StreamingContext(sc, 60)
    # setting a checkpoint to allow RDD recovery
    ssc.checkpoint("~/checkpoint_TwitterApp")

    # read data from port 9001
    dataStream = ssc.socketTextStream(IP, PORT)
    dataStream.pprint()

    words = dataStream.flatMap(lambda line: line.split(" "))

    # calculate the accumulated hashtags count sum from the beginning of the stream
    topTags = hashtagCount(words)
    topTags.pprint()

    # Calculte the word count during each time period 60s
    wordCount = wordCount(words)
    wordCount.pprint()

    # save hashtags count and word count to google storage
    # used to save to google BigQuery
    # You should:
    #   1. topTags: only save the lastest rdd in DStream
    #   2. wordCount: save each rdd in DStream
    # Hints:
    #   1. You can take a look at foreachRDD transformation
    #   2. You may want to use helper function saveToStorage
    #   3. You should use save output to output_directory_hashtags, output_directory_wordcount,
    #       and have output columns name columns_name_hashtags and columns_name_wordcount.
    # TODO: insert your code here
    
    topTags.foreachRDD(saveToStorageHashTag)
    wordCount.foreachRDD(saveToStorageWordCount)

    
    # start streaming process, wait for 600s and then stop.
    ssc.start()
    time.sleep(STREAMTIME)
    ssc.stop(stopSparkContext=False, stopGraceFully=True)
    print("Running Finished!")

    # put the temp result in google storage to google BigQuery
    saveToBigQuery(sc, output_dataset, output_table_hashtags, output_directory_hashtags)
    saveToBigQuery(sc, output_dataset, output_table_wordcount, output_directory_wordcount)
    print("Saved To BigQuery Finished!")

-------------------------------------------
Time: 2019-10-31 22:51:00
-------------------------------------------
@LauraBaileyVO @beanprincess19 Horror movie, but instead of slasher sound effects, anytime the monsters are nearby, you hear rolling dice.RT @kollyempire: SRK - Atlee movie titled as #Sanki and announcement coming up on Nov 2nd, the star's birthday special?

#Atlee #Bollywood…RT @DashieXP: So I was watching child’s play 2 and never realized there was a fire ass beat potential in this movie! I had to add some drum…RT @MikeQuindazzi: This is how #AI sees and interprets &gt;&gt;&gt; HT @Paula_Piccard and @jblefevre60 via @MikeQuindazzi &gt;&gt;&gt; #MachineLearning #Deep…RT @Roadmancode: Turkey is leading from the front in the war against the Vegans. https://t.co/aMhTconvpwI’m on #imdb cos I make the movie #filmdirector #filmcomposer #screenwriter #makingmovies  https://t.co/Ca39DqStIoThe secret to whisky's global success  https://t.co/ODPJdnVNxE # via @BBC_FutureRT @LORDY_BAR

-------------------------------------------
Time: 2019-10-31 22:54:00
-------------------------------------------
('movie', 131, 'Thu Oct 31 22:54:08 2019')
('ai', 11, 'Thu Oct 31 22:54:08 2019')
('good', 4, 'Thu Oct 31 22:54:08 2019')
('data', 3, 'Thu Oct 31 22:54:09 2019')
('spark', 11, 'Thu Oct 31 22:54:09 2019')

-------------------------------------------
Time: 2019-10-31 22:55:00
-------------------------------------------
#HalloweenFridayFashion Therapy # 1475 https://t.co/eopUUiBcJc https://t.co/8NKh8RKt48@AllieGoertz I love everything about that movie. Great writing, acting, directing, and an awesome soundtrack as wel… https://t.co/PmBk6BJeFfRT @catsuka: Animations from "The Legend Of Hei" chinese movie.
This is the last (31/31) daily post of my #LegendOfHeInktober series.
Thank…Earned 2 PSN trophies (1 silver and 1 bronze) in AI: The Somnium Files https://t.co/nTSesb1NfaLong before James Gunn rose to total pop-culture prominence with the two 'Guardians of the Galaxy' films, h

-------------------------------------------
Time: 2019-10-31 22:58:00
-------------------------------------------
('movie', 130, 'Thu Oct 31 22:58:09 2019')
('ai', 8, 'Thu Oct 31 22:58:09 2019')
('good', 4, 'Thu Oct 31 22:58:09 2019')
('spark', 8, 'Thu Oct 31 22:58:09 2019')
('data', 1, 'Thu Oct 31 22:58:09 2019')

-------------------------------------------
Time: 2019-10-31 22:59:00
-------------------------------------------
Thank you to… https://t.co/hYdiOSZfIbVote for #WeMadeIt! 😁RT @dabi_hawks: ANYWAY if you type "Google drive" after a movie title in your search bar then you'll probably find pretty much any movie ou…Girl the eeriness of this movie..... like who knew being in broad day light in the middle of Sweden was spooking me@ThatKevinSmith @LAWeekly When will we get to see the movie in Australia @ThatKevinSmithRT @Neeky96: And this is me running to see the movie when it comes out 😍
Can't wait for it 😍 #TheKillingOfTwoLovers #ClayneCrawford @Clayne…@TondarSarkazian A deep movi

-------------------------------------------
Time: 2019-10-31 23:07:00
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 23:08:00
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 23:08:00
-------------------------------------------
('#ai', 94)
('#원호사랑해', 45)
('#wonhocomeback', 42)
('#halloween', 34)
('#machinelearning', 28)
('#legendofheinktober', 24)
('#lookathernow', 22)
('#933kobfmtakeover', 22)
('#wemadeit', 22)
('#วาร์ปสิวะ', 18)
...

-------------------------------------------
Time: 2019-10-31 23:08:00
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 23:09:00
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 23:09:00
-------------------------------------------
('#ai', 94)
('#원호사랑해', 45)
('#wonhocomeback', 42)
('#halloween', 34)
('#machinelearning', 28)
('#leg