In [None]:
import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=3 # Try with 10
tasks_per_node=8 
memory_per_task=1024 #1 gig per process, adjust accordingly, try with 5120 later
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="1:00" #1 hour
os.environ['SBATCH_PARTITION']='lattice' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)


In [None]:
import json
import emoji
from datetime import datetime, timedelta
from timezonefinder import TimezoneFinder
import pytz
import re
import ssl
import certifi
from time import sleep
import nltk 
from nltk import bigrams 
from nltk.util import ngrams
import sys
from pyspark.sql.functions import udf
from pyspark.sql.functions import expr
from pyspark.sql.types import *
import time

nltk.download('punkt')

class Logger(object):
    def __init__(self, file_name):
        self.terminal = sys.stdout
        self.log = open(file_name, "w")

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)

    def flush(self):
        # this flush method is needed for python 3 compatibility.
        # this handles the flush command by doing nothing.
        # you might want to specify some extra behavior here.
        pass

def parseTimeZone(latitude, longitude):  # Get timezone object from latitude, longitude
    tf = TimezoneFinder()
    timezone = tf.timezone_at(lat=latitude, lng=longitude)
    if timezone is None:
        timezone = tf.closest_timezone_at(lat=latitude, lng=longitude)
    return timezone

def parseDate(date_info, latitude,
              longitude):  # Method to parse the date/time of a tweet into the tweeter's local time.  
    orig_date = datetime.strptime(date_info, '%a %b %d %H:%M:%S +0000 %Y')
    fmt = '%a, %b %d %Y %H:%M:%S'
    new_date = datetime.strptime(datetime.strftime(orig_date, fmt), fmt)
    tz = parseTimeZone(latitude, longitude)
    localized_time = new_date.astimezone(pytz.timezone(tz))
    offset = int(str(localized_time)[-6:-3])
    delta = timedelta(hours=offset)
    localized_time = new_date + delta
    return localized_time

def parseProvince(city_province_string):  # Parse province from city_province_string
    city_and_province = city_province_string.split(',')
    if len(city_and_province) > 1:
        return city_and_province[1].strip()
    else:
        return None

def getAccountLife(tweet_post_date, user_creation_date):  # Get the account life to the post in days.
    tweet_date = datetime.strptime(tweet_post_date, '%a %b %d %H:%M:%S +0000 %Y')
    user_c_date = datetime.strptime(user_creation_date, '%a %b %d %H:%M:%S +0000 %Y')
    elapsed_time = tweet_date - user_c_date
    elapsed_time_in_days = elapsed_time / timedelta(minutes=1) / 60 / 24
    return elapsed_time_in_days

def parseHour(date_info, lattitude,
              longitude):  # Get the hours from time object.  Convert to time object with parseDate and get the hour.  Return an integer.
    try:
        parsed_date_info = parseDate(date_info, lattitude, longitude)
        hour = int(parsed_date_info.hour)
        return hour
    except AttributeError as e:
        return None

def parseWeekday(date_info, lattitude,
                 longitude):  # Get the weekday from the time object. Convert to time object with parseDate and get the weekday.  Return as string.  
    try:
        parsed_date_info = parseDate(date_info, lattitude, longitude)
        weekday = str(parsed_date_info.weekday())
        return weekday
    except AttributeError as e:
        return None

def parseDay(date_info, lattitude,
             longitude):  # Get the weekday from the time object. Convert to time object with parseDate and get the weekday.  Return as string.  
    try:
        parsed_date_info = parseDate(date_info, lattitude, longitude)
        weekday = str(parsed_date_info.day)
        return weekday
    except AttributeError as e:
        return None

def getRawCleanWords(
        tweet_text):  # Method to get clean words from text (no punctuation, lowercase, etc). Used to check for sleep/stress keywords.
    tweet_text = tweet_text + " ."
    tweet_text = tweet_text.replace('&amp;', " and ")
    tweet_text = tweet_text.replace('+', " ")
    tweet_text = tweet_text.replace('=', " ")
    tweet_text = tweet_text.replace('\n', " ")
    tweet_text = tweet_text.replace('@', " AT_")
    tweet_text = tweet_text.replace('#', " ")
    tweet_text = tweet_text.replace('-', " ")
    tweet_text = tweet_text.replace('\'', "")
    tweet_text = add_space(tweet_text)
    tweet_text = emoji.demojize(tweet_text)
    clean_words_1 = []
    for word in re.split(' +', tweet_text):
        if word.startswith('https'):
            continue
        else:
            word = word.lower()
            word = word.replace("\\", " and ")
            word = word.replace("/", " and ")
            word = re.sub(r'[^a-z0-9\s_]', ' ', word)
            while (word.startswith(" ") or word.endswith(" ")):
                word = word.strip()
            clean_words_1.append(word)
    clean_sentence_1 = ' '.join(clean_words_1)
    clean_words_2 = re.split(' +', clean_sentence_1)
    for word in clean_words_2:
        while (word.startswith(" ") or word.endswith(" ")):
            word = word.strip()
        while (word.startswith("_") or word.endswith("_")):
            word = word.strip("_")
    return (' '.join(clean_words_2))

def hasSleepKeywords(clean_words):  # Method returns true/false for presence of sleep keywords in clean text.  
    list_sleep_words = ["bed", "bedtime", "hibernation","slumber","coma","doze" "sleep", "sack", "insomnia",
                        "dodo", "zzz", "siesta", "tired", "nosleep", "cantsleep", "exhausted", "sleepless",
                        "hours", "awake", "late", "rest", "asleep","slept","sleeping", "sleepy", "asleep",
                        "shuteye", "nap", "oclock", "melatonin","caffeine", "coffee", "ambien", "zolpidem",
                        "lunesta", "intermezzo", "trazadone", "eszopiclone","zaleplon"] 
    list_sleep_bigrams = [["pass", "out"], ["get", "up"], ["wake", "up"], ["long","week"], ["long","night"],
                          ["close", "eyes"], ["long","day"], ["late","night"], ["nod","off"], ["bad", "night"]]
    
    list_clean_words = re.split(' +', clean_words)
    
    for word in list_clean_words:
        if word in list_sleep_words:
            return True

    tweet_dict_bigrams = list(bigrams(list_clean_words))
    for each_bigram in tweet_dict_bigrams:
        for sleep_bigram in list_sleep_bigrams:
            if sleep_bigram[0] == each_bigram[0] and sleep_bigram[1] == each_bigram[1]:
                return True

    return False

def hasStressKeywords(clean_words):  # Method returns true/false for presence of stress keywords in clean text.   
    list_stress_words = ["cold", "sick", "life", "stress", "school", "depression", "fucking",
                         "stressor", "anxiety", "pressure", "depressed", "study", "heart", "pain",
                         "stressful", "job", "hate", "shit", "fuck", "suffer", "die", "kill"
                         "baby", "surgery", "therapy"]  # List of stress keywords to compare tweet text to.
    list_stress_bigrams = [["work", "today"], ["busy", "work"], ["dont", "feel"], ["years", "ago"], ["dont", "care"],
                           ["long", "time"],
                           ["mental", "health"], ["feel", "bad"], ["suicidal", "thoughts"], ["feel", "guilty"],
                           ["hard", "time"], ["mental", "illness"]]
    # List of stress keywords to compare tweet text to.
    list_clean_words = re.split(' +', clean_words)
    for word in list_clean_words:
        if word in list_stress_words:
            return True

    tweet_dict_bigrams = list(bigrams(list_clean_words))
    for each_bigram in tweet_dict_bigrams:
        for stress_bigram in list_stress_bigrams:
            if stress_bigram[0] == each_bigram[0] and stress_bigram[1] == each_bigram[1]:
                return True

    return False

def hasSpam(clean_words):  #Method to check for spam words or bigrams.
    list_spam_trigrams = [["in", "our", "bio"],
                          ["click", "the", "link"],
                          ["click", "to", "apply"],
                          ["a", "great", "fit"],
                          ["for", "rent", "on"],
                          ["recommend", "anyone", "for"],
                          ["were", "hiring", "in"],
                          ["if", "youre", "looking"],
                          ["fit", "for", "you"],
                          ["interested", "in", "a"],
                          ["apartments", "for", "rent"],
                          ["score", "a", "job"]]    
    tokenize = nltk.word_tokenize(clean_words)
    tweet_dict_trigrams = ngrams(tokenize, 3)
    for each_trigram in tweet_dict_trigrams:
        for spam_trigram in list_spam_trigrams:
            if spam_trigram[0] == each_trigram[0] and spam_trigram[1] == each_trigram[1] and spam_trigram[2] == each_trigram[2]:
                return True
    return False

def negation_sub(
        text):  # Method to append NEG_ to every word after a negation word, up till the next punctuation mark.  
    transformed = re.sub(r'\b(?:not|no|never|aint|doesnt|havent|lacks|none|mightnt|shouldnt|'
                         r'cannot|dont|neither|nor|mustnt|wasnt|cant|hadnt|isnt|neednt|without|'
                         r'darent|hardly|lack|nothing|oughtnt|wouldnt|didnt|hasnt|lacking|nobody|'
                         r'nowhere|shant)\b[\w\s]+[.,:;!?]',
                         lambda match: re.sub(r'(\s+)(\w+)', r'\1NEG_\2', match.group(0)),
                         text,
                         flags=re.IGNORECASE)
    return transformed

def is_emoji(s):  # Method to check if a string is an emoji.
    return s in emoji.UNICODE_EMOJI

def add_space(text):  # Method to add a space between word-emoji pairs.
    result = ''
    for char in text:
        if is_emoji(char):
            char = 'emoji_' + char + " "
            result += ' '
        result += char
    return result.strip()


def parseTweetText(tweet_text):  #Method removes punctuation, lowercase, emojis,
    # and adds negation filter.    
    tweet_text = tweet_text + " ."
    tweet_text = tweet_text.replace('&amp;', " and ")
    tweet_text = tweet_text.replace('+', " ")
    tweet_text = tweet_text.replace('=', " ")
    tweet_text = tweet_text.replace('\n', " ")
    tweet_text = tweet_text.replace('@', " AT_")
    tweet_text = tweet_text.replace('#', " ")
    tweet_text = tweet_text.replace('-', " ")
    tweet_text = tweet_text.replace('\'', "")
    tweet_text = add_space(tweet_text)
    tweet_text = emoji.demojize(tweet_text)
    filtered_words_1 = []

    for word in re.split(' +', tweet_text):
        if word.startswith('https'):
            continue
        else:
            word = word.lower()
            word = word.replace("\\", " and ")
            word = word.replace("/", " and ")
            word = re.sub(r'[^a-z0-9\s.,:;!?_]', '', word)
            word = word.strip()
            if (word != ""):
                filtered_words_1.append(word)

    filtered_sentence_1 = ' '.join(filtered_words_1)
    filtered_words_2 = []
    filtered_sentence_2 = negation_sub(filtered_sentence_1)
    for word in re.split(' +', filtered_sentence_2):
        word = re.sub(r'[.,:;!?]', ' ', word)
        while (word.startswith(" ") or word.endswith(" ")):
            word = word.strip()
        while (word.startswith("_") or word.endswith("_")):
            word = word.strip("_")
        if (word != ""):
            filtered_words_2.append(word)
    last_filtered = ' '.join(filtered_words_2)
    last_filtered = last_filtered.replace("emoji_ ", "emoji_")
    return last_filtered


def remove_comma(orig_text):
    orig_text = orig_text.replace("\n", " ")
    return orig_text.replace(",", " ")

def map_province(province):
    incorrect_provinces = ["Subd. C", "Toronto", "Subd. B", "Vancouver", "Subd. A", "Montréal", "Subd. D", "Calgary",
                           "Subd. O", "Nouveau-Brunswick"]
    corrections = ["Newfoundland and Labrador", "Ontario", "Nova Scotia", "British Columbia",
                   "Newfoundland and Labrador", "Québec", "Newfoundland and Labrador",
                   "Alberta", "Newfoundland and Labrador", "New Brunswick"]
    incorrect_correct_dict = dict(zip(incorrect_provinces, corrections))
    if province in incorrect_provinces:
        return incorrect_correct_dict[province]
    else:
        return province


start_time = time.time()

# sys.stdout = Logger("test_file_2_log_test_2.txt")
outputFileName = "test_files_1_output_test.csv"
df = sqlCtx.read.json("/home/cheunw/Group Project Folder/Project Raw Data Files/1.json")
print("Number of tweets in dataframe", df.count())
# df.printSchema()

desired_columns = (
    expr("created_at as tweet_date"),
    expr("text as orig_text"),
    expr("user.id as user_id"),
    expr("user.verified as user_verified"),
    expr("user.followers_count as user_followers_count"),
    expr("user.friends_count as user_friends_count"),
    expr("user.listed_count as user_listed_count"),
    expr("user.statuses_count as user_statuses_count"),
    expr("user.created_at as user_creation_date"),
    expr("place.name as city"),
    expr("place.full_name as city_and_province"),
    expr("place.country as country"),
    expr("place.bounding_box.coordinates[0][0][0] as longitude"),
    expr("place.bounding_box.coordinates[0][0][1] as latitude"),
    "lang", "is_quote_status", "retweeted")

content = df.select(*desired_columns)
content = content.where(content.country == "Canada")
content = content.where(content.lang == "en")
content = content.where(content.retweeted == False)
content = content.where(content.is_quote_status == False)

#USER DEFINED FUNCTIONS WITH SPARK DATAFRAMES 
udf_remove_comma = udf(remove_comma, StringType())
content = content.withColumn("orig_text", udf_remove_comma("orig_text"))

#CHECK FOR SLEEP/STRESS 
udf_get_raw_clean_words = udf(getRawCleanWords, StringType())
content = content.withColumn("raw_cleaned_text", udf_get_raw_clean_words("orig_text"))
udf_get_sleep_marker = udf(hasSleepKeywords, BooleanType())
content = content.withColumn("sleep_marker", udf_get_sleep_marker("raw_cleaned_text"))
udf_get_stress_marker = udf(hasStressKeywords, BooleanType())
content = content.withColumn("stress_marker", udf_get_stress_marker("raw_cleaned_text"))

#FILTER ONLY SLEEP/STRESS
content = content.where((content.sleep_marker == True) | (content.stress_marker == True))

#Create SPAM/filter 
udf_spam_flag = udf(hasSpam, BooleanType())
content = content.withColumn("spam_marker", udf_spam_flag("raw_cleaned_text"))

#FILTER OUT SPAM
content = content.where((content.spam_marker == False))

#ADD CITY AND PROVINCE
content = content.withColumn("city", udf_remove_comma("city"))
udf_city_province_string_to_province = udf(parseProvince, StringType())
content = content.withColumn("province", udf_city_province_string_to_province("city_and_province"))
udf_map_provinces = udf(map_province, StringType())
content = content.withColumn("province", udf_map_provinces("province"))

#FILTER OUT PROVINCES 
content = content.where((content.province == "Ontario") |
                        (content.province == "Alberta") |
                        (content.province == "British Columbia") |
                        (content.province == "Québec") |
                        (content.province == "Manitoba") |
                        (content.province == "Yukon") |
                        (content.province == "Nova Scotia") |
                        (content.province == "Northwest Territories") |
                        (content.province == "Newfoundland and Labrador") |
                        (content.province == "New Brunswick") |
                        (content.province == "Saskatchewan") |
                        (content.province == "Prince Edward Island"))

# GET THE TWEET HOUR, WEEKDAY, AND DAY.  FILTER OUT IF NULL.  
udf_get_tweet_hour = udf(parseHour, IntegerType())
content = content.withColumn("tweet_hour", udf_get_tweet_hour("tweet_date", "latitude", "longitude"))

udf_get_tweet_weekday = udf(parseWeekday, StringType())
content = content.withColumn("tweet_weekday", udf_get_tweet_weekday("tweet_date", "latitude", "longitude"))

udf_get_tweet_day = udf(parseDay, StringType())
content = content.withColumn("tweet_day", udf_get_tweet_day("tweet_date", "latitude", "longitude"))

content = content.where(content.tweet_hour.isNotNull())
content = content.where(content.tweet_day.isNotNull())
content = content.where(content.tweet_weekday.isNotNull())

# FINALLY, GET FILTERED TEXT AND ACCOUNT LIFE FOR REMAINING ROWS OF DF
udf_get_final_cleaned_tweet = udf(parseTweetText, StringType())
content = content.withColumn("filtered_text", udf_get_final_cleaned_tweet("orig_text"))

udf_get_account_life_days = udf(getAccountLife, DoubleType())
content = content.withColumn("account_life_days", udf_get_account_life_days("tweet_date", "user_creation_date"))

# print("Printing new schema")
content.printSchema()
# content.show(3, False)


# QUERY TESTS

columns_to_keep_in_clean_data = ("filtered_text",
                                 "orig_text",
                                 "raw_cleaned_text",
                                 "account_life_days",
                                 "latitude", "longitude",
                                 "city", "province", "country",
                                 "tweet_day", "tweet_hour", "tweet_weekday", "user_id",
                                 "user_followers_count", "user_friends_count", "user_listed_count",
                                 "user_statuses_count",
                                 "user_verified")

# EXTRACT THE TWEETS WITH GOOD COLUMNS
content = content.select(*columns_to_keep_in_clean_data)

print("Number of extracted tweets:", content.count())

# WRITE THE TWEETS TO THE CSV 
# content.write.option("header", "false").csv(outputFileName)

end_time = time.time()
print("Time of execution in seconds:", end_time - start_time)
print("Writing to", outputFileName, "complete")


[nltk_data] Downloading package punkt to /home/cheunw/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
