In [93]:
#Imports 
# This is the run batch for total count for uid 
# on 40 GB of data and save to a database

import sys
import datetime
from datetime import timedelta

sys.path.append('../../')

from pyspark.sql import SparkSession, Row
from pyspark.ml import Pipeline

from pyspark.sql.functions import explode

from dateutil.parser import parse
# for tokenizing
from nltk.tokenize import sent_tokenize, word_tokenize, RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem.snowball import SnowballStemmer
# for schema
from pyspark.sql.types import *

# for testing 
import random


# instantiate a spark context object
appname= "proto_submit_0"
master_url = "spark://ec2-52-40-21-59.us-west-2.compute.amazonaws.com:7077"
# Create Spark Session
spark = SparkSession.builder.appName(appname)\
                .master(master_url)\
                .config("spark.cassandra.connection.host", "ec2-52-40-21-59.us-west-2.compute.amazonaws.com")\
                .config("spark.cassandra.connection.port", "9042")\
                .config("spark.eventLog.enabled", True)\
                .config("spark.eventLog.dir", "/home/ubuntu/spark_tmp/")\
                .getOrCreate()

# zipped file contains meta data ==> different indice for twitter data dump
zip_path = 's3a://twitter-data-dump/test.tar'
large_unzipped = 's3a://twitter-data-dump/2013-07/01_10_02.json.bz2'
large_unzipped = 's3a://twitter-data-dump/2013-07/'
df = spark.read.json(large_unzipped)

#type(data) --> data frame

# Select interested col attributes
main_df = df.selectExpr('id AS tid',\
                        'user.id AS uid',\
                        'text AS tweet',\
                        'user.created_at AS creation_time',\
                        'user.time_zone AS time_zone',\
                        'user.followers_count AS followers_count',\
                        'user.friends_count AS friends_count',\
                        'place.name AS city_name',\
                        'place.country AS country_name',\
                        'entities.media.media_url AS media_ary',\
                        'entities.hashtags.text AS hashtag_ary',\
                        'retweet_count',\
                        'favorite_count'                        
                       ).where('tid is NOT NULL AND uid is NOT NULL')

#main_df.printSchema()

# Main DF column name set
col_exp_set = ['tid','uid','tweet','creation_time',
               'time_zone','followers_count',
               'friends_count','city_name','country_name',
               'media_ary','hashtag_ary','retweet_count',
               'favorite_count',
              ]


###### Debuggging 3 records only

#base_rdd = main_df.limit(3).rdd
#base_rdd.count()
base_rdd = main_df.rdd
base_rdd.take(1)

# return 2 items: sentence_count,  <word_tuple>
def process_tweet(description):
    # base case
    if description is None or description == "":
        return (0,[])

    stemmer = SnowballStemmer("english")
    tokenizer = RegexpTokenizer(r'\w+')
    word_list = []
 
    words = tokenizer.tokenize(description)
    stopWords = set(stopwords.words('english'))
    for w in words:
        if w not in stopWords:
            word_list.append(stemmer.stem(w.lower()))

    return (len(sent_tokenize(description)),word_list)


# parse twitter time string to (date, timestamp_str, hour int)
# note later only timestamp_str is changed to asTYpe
def parse_time(creation_time):  
    # fcn that converts dt to date-str and time-str
    def cassandra_convert(dt):
        hour = dt.strftime("%H")
        # debug:
        base_date = datetime.date(2008,4,1)
        days_count = random.randint(1,3000)
        dt = base_date + timedelta(days=days_count)
        return (dt, str(dt),int(hour))       
        #return (dt.date(), str(dt),int(hour))
    
    dt = None
    try:
        dt = parse(creation_time)
    except Exception as e:
        # 1. log and 
        # 2.use current system time instead
        dt = datetime.datetime.now()
    return cassandra_convert(dt)

# Mapping: 1. 13 cols to 15 cols (word_list, date, 
#          timestamp, hour)
#          2. (media_ary-> media_attached; tag_ary -> tag_count)
def map_row(row):
    # return if image attached
    def check_image(media_ary):
        if media_ary is None or\
           len(media_ary) == 0:
            return False
        return True
    
    # count # of hashtags
    def count_tag(tag_ary):
        if tag_ary is None:
            return 0
        return len(tag_ary)
    
    # create a list of tuples: [(str1, like_num),] or [(str1, ret_num)] 
    def create_tuple_list(vector, num):
        result = [num]
        for item in vector:
            result.append(item)
        if len(result) == 0:
            return []
        return result
        
    try:       
        tid = row[0]
        uid = row[1]
        tweet = row[2]
        retweet_count = row[11]
        favorite_count = row[12]
        # Map token: sentence_count, word_list
        sentence_count, word_list\
            = process_tweet(tweet)
        # Map Time
        creation_time = row[3]
        date, timestamp, hour\
            = parse_time(creation_time)
        time_zone = row[4]
        followers_count = row[5]
        friends_count = row[6]
        city_name = row[7]
        country_name = row[8]
        media_ary = row[9]
        # boolean
        media_attached = check_image(media_ary)        
        hashtag_ary = row[10]
        # count tags
        tag_count = count_tag(hashtag_ary)
                
    except Exception as e:
        # 1. log e and 2.return default        
        return Row(tid=-1, uid=-1,
            followers_count=-1,
            friends_count=-1,
            tweet='n/a',retweet_count=-1,
            favorite_count=-1,sentence_count=-1,
            word_list=[],
            phrase_list=['n/a'],
            date=datetime.date(2001, 1, 1),timestamp='00:00:00',hour=0,
            time_zone='n/a',city_name='n/a',
            country_name='n/a',
            media_attached=False,tag_count=tag_count
            )
    
    r = Row(tid=tid, uid=uid,
            followers_count=followers_count,
            friends_count=friends_count,
            tweet=tweet,retweet_count=retweet_count,
            favorite_count=favorite_count,sentence_count=sentence_count,
            word_list=word_list,
            phrase_list=['n/a'],
            date=date,timestamp=timestamp,hour=hour,
            time_zone=time_zone,city_name=city_name,
            country_name=country_name,
            media_attached=media_attached,tag_count=tag_count
            )
    return r 
    
#base_map_rdd = base_rdd.map(map_row)    

# Main DF column name set
col_exp_set = ['tid','uid','tweet','followers_count',
               'friends_count','retweet_count',
               'favorite_count','sentence_count',
               'word_list',
               'phrase_rt',
               'date','timestamp','hour',
               'time_zone','city_name','country_name',
               'media_attached','tag_count'
              ]
# Sanity Check
base_map_rdd = base_rdd.map(map_row) 
base_map_rdd.take(1)

#len(r) 
# len(col_exp_set) --> 18

# index order
#r
# city_name=0, country_name=1, date=2, 
# favorite_count=3, followers_count=4, friends_count=5,
# hour=6, media_attached=7, phrase_list=8, 
# retweet_count=9, sentence_count=10, tag_count=11, tid=12, time_zone=13, 
# timestamp=14, tweet=15, uid=16, word_list=17

# Do total count computing here: map selected ones only
# take selected columns from base_map_rdd
def map_total_cols(row):
    uid_i = 16
    tid_i = 12   
    rt_c_i = 9
    fav_c_i = 3
    word_list_i = -1
    
    return (row[uid_i],row[tid_i],row[rt_c_i],
            row[fav_c_i], row[word_list_i])
# df_tot    
df_tot_raw = base_map_rdd.map(map_total_cols)

df_tot_raw_k = df_tot_raw.keyBy(lambda x: x[0])
df_tot_raw_k.take(1)
#df_tot_raw_k.collect()

# helper-function for combineByKey 
def add_to_dic(dic, word, count):
    if word in dic:
        k_record = dic.get(word)
        dic.update({word: (k_record[0]+count, k_record[1]+1)})
    else:
        # word and tuple of count and # of times for word
        record = {word:(count,1)}
        dic.update(record)
        
def create_dic_set(row):
    rt_dic = {}
    fav_dic = {}
    rt_c = row[2]
    fav_c = row[3]

    word_list = row[-1]
    if rt_c > 0 and fav_c > 0:
        for w in word_list:
            add_to_dic(rt_dic, w, rt_c)
            add_to_dic(fav_dic, w, fav_c)
    elif rt_c > 0:
        for w in word_list:
            add_to_dic(rt_dic, w, rt_c)
    elif fav_c > 0 :
        for w in word_list:
            add_to_dic(fav_dic, w, fav_c)
    # return dic_set
    return (rt_dic, fav_dic)


# uid_i = 0, tid_i = 1, rt_c_i = 2, fav_c_i = 3, word_list_i = -1
def createCombiner(row):
    #print("len of createCombiner row is: ",len(row))
    rt_dic, fav_dic = create_dic_set(row)
    tid = row[1]
    # create a list of tid
    return ([tid], rt_dic, fav_dic)

# merges 2 dic together
def merge_dic(dic1, dic2):
    if len(dic1) < len(dic2):
        small_dic = dic1
        large_dic = dic2
    else:
        small_dic = dic2
        large_dic = dic1
    for k in small_dic.keys():
        # if k in small_dic
        if k in large_dic:
            # merge values together
            kv1 = small_dic.get(k)
            kv2 = large_dic.get(k)
            value = (kv1[0]+kv2[0], kv1[1]+kv1[1])
            large_dic.update({k:value})
        # simply insert to large_dic    
        else:
            large_dic.update({k:small_dic.get(k)})
    # return large_dic        
    return large_dic

# which merges V into C
def mergeValue(new_row, row):
    #retrieve last 
    rt_dic, fav_dic = create_dic_set(row)
    tid = row[1]
    
    rt_dic_merge = merge_dic(rt_dic, new_row[1])
    fav_dic_merge = merge_dic(fav_dic, new_row[2])
    
    tid_list = new_row[0]
    tid_list.append(tid)
    # return the result
    return (tid_list, rt_dic_merge, fav_dic_merge)

# combine two C's (new row)
def mergeCombiners(r1, r2):
    list_merge = r1[0] + r2[0]
    rt_dic_merge = merge_dic(r1[1], r2[1])
    fav_dic_merge = merge_dic(r1[-1], r2[-1])
    
    return (list_merge, rt_dic_merge, fav_dic_merge)

r = df_tot_raw_k.combineByKey(createCombiner, mergeValue, mergeCombiners)
#r.take(1)

#r.filter(lambda x: len(x[1][1]) > 0).take(1)

def get_top_6(row_v):
    rt_dit = row_v[1]
    
    ak = 'n_a'
    a = -1    
    bk = 'n_a'
    b = -1
    ck = 'n_a'
    c = -1
    dk = 'n_a'
    d = -1
    ek = 'n_a'
    e = -1
    fk = 'n_a'
    f = -1
    
    for k, v in rt_dit.iteritems():
        av = v[0]/v[1]
        if av > a:
            # replace count
            f = e
            e = d
            d = c
            c = b
            b = a
            a = av
            # replace key          
            
            fk = ek
            ek = dk
            dk = ck
            ck = bk 
            bk = ak
            ak = k
            
    return ((ak, a),(bk, b),(ck, c),(dk, d),(ek, e),(fk,f) )
                
tmp = r.mapValues(get_top_6)

tmp.take(1)

#tmp.toDF()

# build the table schema
def build_tuple():
    return ArrayType(StructType([
        StructField("word_name", StringType(), True),
        StructField("count", IntegerType(), False)
    ]))

def build_schema():
    w = build_tuple()
    schema = \
    StructType([
        StructField("uid",IntegerType(), True),
        StructField("top_word_list", ArrayType(StructType([
        StructField("word_name", StringType(), True),
        StructField("count", IntegerType(), False)])
        , True), True)
    ])
    return schema

# fit rdd with schema
schema = build_schema()

df_uncasted = spark.createDataFrame(tmp, schema)
#df_uncasted.printSchema()

#df_uncasted.first()

def save_data_frame(df, table_name):
    df.write.format("org.apache.spark.sql.cassandra").\
            mode('append').options(table=table_name,keyspace='twitter').save()    
# default to twitter        
save_data_frame( df_uncasted, "batch_test")

KeyboardInterrupt: 

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/local/spark/python/pyspark/shell.py", line 65, in <lambda>
    atexit.register(lambda: sc.stop())
  File "/usr/local/spark/python/pyspark/context.py", line 419, in stop
    self._accumulatorServer.shutdown()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 253, in shutdown
    SocketServer.TCPServer.shutdown(self)
  File "/usr/lib/python2.7/SocketServer.py", line 246, in shutdown
    self.__is_shut_down.wait()
  File "/usr/lib/python2.7/threading.py", line 614, in wait
    self.__cond.wait(timeout)
  File "/usr/lib/python2.7/threading.py", line 340, in wait
    waiter.acquire()
  File "/usr/local/spark/python/pyspark/context.py", line 236, in signal_handler
    self.cancelAllJobs()
  File "/usr/local/spark/python/pyspark/context.py", line 962, in cancelAllJobs
    self._jsc.sc().cancelAllJobs()
Attribut