In [1]:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
from google.cloud import bigquery
from os import listdir
import sys
import requests
import time
import subprocess
import re,os
import pandas as pd

In [2]:
    # global variables
bucket = "streamprocessing-group12" 
# output_directory_hashtags = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/hashtagsCount'.format(bucket)
# output_directory_wordcount = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/wordcount'.format(bucket)
output_directory_hashtags = './hashtags'
output_directory_wordcount = './wordcount'
    # output table and columns name
# output_dataset = 'result'                     #the name of dataset in BigQuery
output_table_hashtags = 'hashtags'
columns_name_hashtags = ['hashtags', 'count']
output_table_wordcount = 'wordcount'
columns_name_wordcount = ['word', 'count', 'time']

In [3]:
    # parameter
IP = 'localhost'    # ip port
PORT = 9001       # port
STREAMTIME = 960         # time that the streaming process runs
WORD = ['covid-19', 'stayathome', 'hero', 'nursing']     #the words you should filter and do word count

In [4]:
# Helper functions
def saveToStorage(rdd, output_table, output_directory, columns_name, mode):
    """
    Save each RDD in this DStream to google storage
    Args:
        rdd: input rdd
        output_directory: output directory
        columns_name: columns name of dataframe
        mode: mode = "overwirte", overwirte the file
              mode = "append", append data to the end of file
    """
    file_name = '{}.csv'.format(output_table) 
#     file_exists = os.path.isfile(file_name)
#     csvWriter = csv.writer(csvFile)   
#     csvFile = open(output_table, 'a')   
#     if not file_exists:
#             csvWriter.writerow(columns_name)
            
    if not rdd.isEmpty():
        (rdd.toDF( columns_name ) \
        .repartition(1).write.option("header", "true").csv(path = output_directory, mode=mode))
        
    CSV_File = [file for file in listdir(output_directory) if file.endswith('.csv')][0]
    if CSV_File != file_name:
        old_file = os.path.join(output_directory, CSV_File)
        new_file = os.path.join(output_directory, file_name)
        os.rename(old_file, new_file)

def saveToBigQuery(sc, output_dataset, output_table, directory):
    """
    Put temp streaming json files in google storage to google BigQuery
    and clean the output files in google storage
    """
    files = directory + '/part-*'
    subprocess.check_call(
        'bq load --source_format NEWLINE_DELIMITED_JSON '
        '--replace '
        '--autodetect '
        '{dataset}.{table} {files}'.format(
            dataset=output_dataset, table=output_table, files=files
        ).split())
    output_path = sc._jvm.org.apache.hadoop.fs.Path(directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
        output_path, True)

# def saveTolocal(sc, columns_name, output_table):
#     df = SQLContext.createDataFrame(sc, columns_name)
#     df.coalesce(1).write.format(output_table).options(header='true').save('./csv_out')

In [5]:
def aggregate_count (newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

In [6]:
def hashtagCount(words):
    """
    Calculate the accumulated hashtags count sum from the beginning of the stream
    and sort it by descending order of the count.
    Ignore case sensitivity when counting the hashtags: "#Ab" and "#ab" is considered to be a same hashtag
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (hashtag, count)
    """
    # Filter out the word that is hashtags, and map them into (hashtag, 1)
    hashtags = words.filter(lambda a: '#' in a and a != '#').map(lambda x: (x.lower(), 1))
    # sum the count of current DStream state and previous state
    hashtags_totals = hashtags.updateStateByKey(aggregate_count)
    sorted_hashtags = hashtags_totals.transform(lambda rdd: rdd.sortBy(lambda x: -x[1]))
    return sorted_hashtags

In [7]:
def wordCount(words):
    """
    Calculte the count of 5 sepcial words in 120 seconds for every 120 seconds (window no overlap)
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (word, count, time)
    """
    # Reduce last 120 seconds of data, every 120 seconds
    wordnew = words.filter(lambda x:x.lower() in WORD).map(lambda x: (x.lower(),1))
    windowedWordCounts = wordnew.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 120, 120)
    DStreamwithTime = windowedWordCounts.transform(lambda time, rdd: rdd.map(lambda x:(x[0], x[1], str(time))))
    return DStreamwithTime

In [None]:
    # Spark settings
conf = SparkConf()
conf.setMaster('local[2]')
conf.setAppName("TwitterStreamApp")
    
    # create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

    # create sql context, used for saving rdd
sql_context = SQLContext(sc)

    # create the Streaming Context from the above spark context with batch interval size 5 seconds
ssc = StreamingContext(sc, 5)

    # setting a checkpoint to allow RDD recovery
ssc.checkpoint("~/checkpoint_TwitterApp")

    # read data from port 9001
dataStream = ssc.socketTextStream(IP, PORT)
# dataStream.pprint()
    
words = dataStream.flatMap(lambda line: line.split(" "))
    # calculate the accumulated hashtags count sum from the beginning of the stream
topTags = hashtagCount(words)
# topTags.pprint()
    # Calculte the word count during each time period 60s
CountResult = wordCount(words)
# CountResult.pprint()

    # save hashtags count and word count to storage
topTags.foreachRDD(lambda a: saveToStorage(a, output_table_hashtags, output_directory_hashtags, columns_name_hashtags, "overwrite"))
CountResult.foreachRDD(lambda b: saveToStorage(b, output_table_wordcount, output_directory_wordcount,columns_name_wordcount,"overwrite"))

    # start streaming process, wait for 600s and then stop.
ssc.start()
time.sleep(STREAMTIME)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

# saveToBigQuery(sc, output_dataset, output_table_hashtags, output_directory_hashtags)
# saveToBigQuery(sc, output_dataset, output_table_wordcount, output_directory_wordcount)