## Part 1

In [1]:
import sys
print(sys.version)
print(spark.version)

3.8.15 | packaged by conda-forge | (default, Nov 22 2022, 08:46:39) 
[GCC 10.4.0]
3.1.3


In [2]:
import os
import time
import subprocess

from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
warnings.filterwarnings(action='ignore')
spark = SparkSession.builder.getOrCreate()

##Add "eagerEval.enabled" to beautify the way Spark DF is displayed
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

## To use legacy casting notation for date
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [None]:
bucket_name = 'ksr-tweet-bucket'
prefix = 'tweet_project'

cmd = 'hadoop fs -du -s -h ' + 'gs://' + bucket_name + '/' + prefix + '/'

dat = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in dat.stdout.readlines():
    print (line)
    
vol = dat.wait()

In [None]:
!hadoop fs -ls 'gs://ksr-tweet-bucket/tweet_project/' | head

In [None]:
raw_tweets = spark.read.json('gs://' + bucket_name + '/' + prefix)


23/03/10 19:36:59 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
23/03/10 19:37:14 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
23/03/10 19:45:03 WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
23/03/10 20:25:25 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [None]:
print(raw_tweets.count())



99994342


                                                                                

In [None]:
raw_tweets.show(5)

[Stage 5:>                                                          (0 + 1) / 1]

+-----------+--------------------+------------------+--------------------+-----------------+--------------------+--------------+---------+------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+-----+------------------+-----------+--------------------+-------------------+--------------------+-----------------------+--------------------+-----------+-------------+---------+--------------+--------------------+--------------------+--------------------+-------------+---------+--------------------+--------------------+---------------------+
|coordinates|          created_at|display_text_range|            entities|extended_entities|      extended_tweet|favorite_count|favorited|filter_level| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote

                                                                                

In [None]:
#filter for english text
raw_tweets = raw_tweets.filter(raw_tweets.lang == "en")

In [None]:
from pyspark.sql.functions import col, lower, regexp_replace, trim

raw_tweets = raw_tweets\
    .withColumn('tweet_text', lower('tweet_text'))\
    .withColumn('stripped', regexp_replace(col("tweet_text"), "[^a-zA-Z ]", ""))\
    .withColumn('stripped', trim(col('stripped')))

In [None]:
# List of words related to K-12 & Higher Education
edu_words = ["K-12", "primary school", "middle school", "secondary school", "high school", "kindergarten", 
             "preschool", "teacher", "educationist", "academia", "undergrad", "graduate", "undergraduate", 
             "PhD", "STEM", "math", "mathematics", "science", "physics", "chemistry", "biology", "humanities", 
             "history", "philosophy", "alma mater", "educating", "teaching", "curriculum", "syllabus", 
             "online learning", "educational", "textbook", "schooling", "training", "knowledge", "scholarship", 
             "literacy", "tuition", "academic", "course", "classroom", "classwork", "social-emotional learning",
             "college", "university", "student", "professor", "campus", "degree", "major", "minor", 
             "enrollment", "application", "admission", "graduate school", "scholar", "research", "thesis", 
             "dissertation", "lecture", "academic paper", "institute", "study abroad", "distance learning", 
             "tutor", "peer review", "online course", "MOOC", "learning outcome", "education reform"]


# List of words to remove from tweets
rm_words = ["Uvalde", "shoot", "shooting", "shootings", "gun", "guns", "gunned", "kill", 
            "killed", "murder", "deceased", "attack", "horny", "porn", "sexy", "nude", 
            "naked", "drug", "drugs", "dope", "high", "addict", "addiction", 
            "stoned", "weed", "cocaine", "heroin", "opioid", "gamble", "gambling", 
            "betting", "bet", "casino", "poker", "slot", "sex", "escort", "prostitute", 
            "money", "cash", "payday", "loan", "debt", "bankrupt", "bankruptcy", "fraud", 
            "scam", "scammer", "hack", "hacker", "hacking", "virus", "malware", "spyware"]

import re

def create_regex(word_list):
    """
    Create a regular expression to filter tweets based on a list of words.
    """
    regex = '|'.join(["(" + re.escape(word) + ")" for word in word_list])
    return regex

# Create regular expressions for education and removal words
edu_regex = create_regex(edu_words)
rm_regex = create_regex(rm_words)

# Filter tweets based on the regular expressions
edu_tweets = raw_tweets.filter(raw_tweets['tweet_text'].rlike(edu_regex)).\
    filter(~raw_tweets['tweet_text'].rlike(rm_regex))


In [None]:
edu_tweets.select(count('*').alias('after_filt')).show()

In [None]:
edu_tweets.limit(5)

In [None]:
edu_tweets.describe()

In [None]:
## calculate the percentage of null values in each column 
edu_tweets.select([(count(when(col(c).isNull(), c))/count(lit(1))).alias(c) for c in edu_tweets.schema.names]).show(truncate=True)

In [None]:
tweet_columns = ["coordinates", "created_at", "id_str", "retweeted_status", 
                 "tweet_text", "text"]

user_columns = ["created_at", "description", "followers_count", "id_str", 
                "name", "screen_name", "verified", "location"]

retweet_columns = ["retweet_count", "favorite_count", "reply_count", "quote_count"]

quoted_columns = ["quote_count"]

new_df = edu_tweets.select([*[col('user.' + col_name).alias('user_' + col_name) for col_name in user_columns],
                            *[col(col_name).alias('tweet_' + col_name) for col_name in tweet_columns],
                            *[col('retweeted_status.' + col_name).alias(col_name) for col_name in retweet_columns]])\
                   .withColumn('user_created_at', to_timestamp(col('user_created_at'), 'EEE MMM dd HH:mm:ss zzzzz yyyy'))\
                   .withColumn('tweet_created_at', to_timestamp(col('tweet_created_at'), 'EEE MMM dd HH:mm:ss zzzzz yyyy'))

new_df.show(5, truncate=True)

In [None]:
edu_tweets.write.format("parquet").\
mode('overwrite').\
save('gs://ksr-tweet-bucket/filtered')

new_df.write.format("parquet").\
mode('overwrite').\
save('gs://ksr-tweet-bucket/processed')

In [None]:
%time