## Tweet project - Part 1

#### 1. Load data
#### 2. Select relevant tweets
#### 3. Select important features to be used in model
#### 4. Save the filtered data in parquet format for later use

In [1]:
import sys
print(sys.version)
print(spark.version)

3.8.13 | packaged by conda-forge | (default, Mar 25 2022, 06:04:10) 
[GCC 10.3.0]
3.1.3


In [2]:
import os
import time
import subprocess

from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
warnings.filterwarnings(action='ignore')
spark = SparkSession.builder.getOrCreate()

##Add "eagerEval.enabled" to beautify the way Spark DF is displayed
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

## To use legacy casting notation for date
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

## 1. Load data

### 1.1 Check total data volume in bucket. Should be around ~500 GB

In [4]:
## This data was downloaded using the twitter API and stored in a personal bucket on Google Cloud

bucket_name = 'prayutjain-tweet-bucket' # Personal dir
prefix = 'tweet_project'

cmd = 'hadoop fs -du -s -h ' + 'gs://' + bucket_name + '/' + prefix + '/'

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (line)
    
retval = p.wait()

497.0 G  497.0 G  gs://msca-bdp-tweets/final_project



In [5]:
# !hadoop fs -ls 'gs://prayutjain-tweet-bucket/tweet_project/' | head 

Found 36976 items
-rwx------   3 root root          0 2022-11-06 23:17 gs://msca-bdp-tweets/final_project/_SUCCESS
-rwx------   3 root root    6007332 2022-11-06 23:12 gs://msca-bdp-tweets/final_project/part-00000-f654a635-796b-4190-88ae-6c2ee7e6f3a3-c000.json
-rwx------   3 root root    5638649 2022-11-06 23:12 gs://msca-bdp-tweets/final_project/part-00001-f654a635-796b-4190-88ae-6c2ee7e6f3a3-c000.json
-rwx------   3 root root    6649652 2022-11-06 23:12 gs://msca-bdp-tweets/final_project/part-00002-f654a635-796b-4190-88ae-6c2ee7e6f3a3-c000.json
-rwx------   3 root root    6921001 2022-11-06 23:12 gs://msca-bdp-tweets/final_project/part-00003-f654a635-796b-4190-88ae-6c2ee7e6f3a3-c000.json
-rwx------   3 root root    6626757 2022-11-06 23:12 gs://msca-bdp-tweets/final_project/part-00004-f654a635-796b-4190-88ae-6c2ee7e6f3a3-c000.json
-rwx------   3 root root    6696245 2022-11-06 23:12 gs://msca-bdp-tweets/final_project/part-00005-f654a635-796b-4190-88ae-6c2ee7e6f3a3-c000.json
-rwx-----

### 1.2 Read .json files

In [None]:
%time tweets_raw = spark.read.json('gs://' + bucket_name + '/' + prefix)


22/12/05 20:27:55 WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

CPU times: user 1.26 s, sys: 259 ms, total: 1.52 s
Wall time: 6min 38s


22/12/05 20:33:27 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


## 2. Select education relevant tweets

### 2.1 Filter tweets for "K-12/education" related topics

In [6]:
## Stripped text - all low case, broken into words 

tweets_raw = tweets_raw\
.withColumn('tweet_text', lower('tweet_text'))\
.withColumn('stripped', regexp_replace(col("tweet_text"),"[\$#,&%\".]",""))
#withColumn('match_case', array(filt_words1))
#withColumn('stripped', split('stripped', ' '))


In [7]:
## Dictionary for words similar to 'education/K-12' 

dict_filt = ["primary school","schools","education","k12","high school","teacher","higher secondary",
             "senior secondary","sophomore","math","mathematics","science","physics","chemistry","biology","humanities",
             "history","philosophy","alma mater","academia","educating","teaching","curriculum","online learning",
             "educational","textbook","kindergarten","schooling","k-12","social-emotional learning","training","knowledge",
             "scholarship","literacy","schooling","tuition","undergraduate","academic","course","graduate","stem","phd",
             "classwork","classroom","preschool","educationist","syllabus","middle school","secondary school","undergrad"]

dict_filt1='|'.join(["(" + c +")" for c in dict_filt])

dict_rm = ["shoot","kill","killed","deceased","murder","attack","horny",
           "shooting","shootings","gunned","gun","guns","uvalde"]

dict_rm1='|'.join(["(" + c +")" for c in dict_rm])

tweets_filt = tweets_raw.where(tweets_raw['tweet_text'].rlike(dict_filt1)).\
where(~tweets_raw['tweet_text'].rlike(dict_rm1))

In [None]:
## Check which words are selecting the most filtered tweets
res = []
for keys in dict_filt:
    filter_string = 'tweet_text like "%' + keys + '%"'
    temp = tweets_filt.filter(filter_string).count()
    res.append([temp, keys])
    
sorted(res,key=lambda l:l[0], reverse = True)[:20]

In [None]:
tweets_raw.select(count('*').alias('before_filt')).show()
tweets_filt.select(count('*').alias('after_filt')).show()

22/12/05 21:34:42 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1670262909868_0003_01_000067 on host: hub-msca-bdp-dphub-students-backup-prayutjain-sw-w6z6.c.msca-bdp-students.internal. Exit status: 143. Diagnostics: [2022-12-05 21:34:42.221]Container killed on request. Exit code is 143
[2022-12-05 21:34:42.221]Container exited with a non-zero exit code 143. 
[2022-12-05 21:34:42.224]Killed by external signal
.
22/12/05 21:34:42 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1670262909868_0003_01_000066 on host: hub-msca-bdp-dphub-students-backup-prayutjain-sw-w6z6.c.msca-bdp-students.internal. Exit status: 143. Diagnostics: [2022-12-05 21:34:42.228]Container killed on request. Exit code is 143
[2022-12-05 21:34:42.229]Container exited with a non-zero exit code 143. 
[2022-12-05 21:34:42.231]Killed by external signal
.
22/12/05 21:34:42 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 65

+-----------+
|before_filt|
+-----------+
|   99992797|
+-----------+



22/12/05 21:38:08 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 62 for reason Container marked as failed: container_1670262909868_0003_01_000064 on host: hub-msca-bdp-dphub-students-backup-prayutjain-sw-wd0p.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
22/12/05 21:38:08 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 63 for reason Container marked as failed: container_1670262909868_0003_01_000065 on host: hub-msca-bdp-dphub-students-backup-prayutjain-sw-wd0p.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
22/12/05 21:38:08 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 63 on hub-msca-bdp-dphub-students-backup-prayutjain-sw-wd0p.c.msca-bdp-students.internal: Container marked as failed: container_1670262909868_0003_0

+----------+
|after_filt|
+----------+
|  35186761|
+----------+



                                                                                

## 3. Feature selection

### 3.1 Check data coverage

In [None]:
## Parent field coverage - low for some field: coordinates, extended_entities, geo, place, quoted_status_* .... 

tweets_filt.select([(count(when(col(c).isNull(), c))/count(lit(1))).alias(c) for c in tweets_filt.schema.names]).show(truncate=True)

### 3.2 Select only the well populated cols

In [9]:
tweet_cols = ["coordinates","created_at","id_str","lang","possibly_sensitive","retweeted_status",
              "tweet_text","timestamp_ms","quoted_status","text"]

user_cols = ["created_at","description","favourites_count","followers_count","friends_count","id_str",
            "name","protected","screen_name","statuses_count","verified","withheld_in_countries","location"]

ent_cols = ["hashtags"]

retweet_cols = ["retweet_count","favorite_count","reply_count","quote_count"]

quoted_cols = ["quote_count"]

df = tweets_filt.select([*[col('user.' + col_name).alias('user_' + col_name) for col_name in user_cols],
                                  *[col(col_name).alias('tweet_' + col_name) for col_name in tweet_cols],
                                  *[col('entities.' + col_name).alias(col_name) for col_name in ent_cols],
                            *[col('retweeted_status.' + col_name).alias(col_name) for col_name in retweet_cols]])\
.withColumn('user_created_at',to_timestamp(col('user_created_at'),'EEE MMM dd HH:mm:ss zzzzz yyyy'))\
.withColumn('tweet_created_at',to_timestamp(col('tweet_created_at'),'EEE MMM dd HH:mm:ss zzzzz yyyy'))

df.show(5, truncate=True)

                                                                                

+-------------------+--------------------+---------------------+--------------------+------------------+-------------------+--------------------+--------------+----------------+-------------------+-------------+--------------------------+--------------------+-----------------+-------------------+-------------------+----------+------------------------+----------------------+--------------------+------------------+-------------------+--------------------+--------+-------------+--------------+-----------+-----------+
|    user_created_at|    user_description|user_favourites_count|user_followers_count|user_friends_count|        user_id_str|           user_name|user_protected|user_screen_name|user_statuses_count|user_verified|user_withheld_in_countries|       user_location|tweet_coordinates|   tweet_created_at|       tweet_id_str|tweet_lang|tweet_possibly_sensitive|tweet_retweeted_status|    tweet_tweet_text|tweet_timestamp_ms|tweet_quoted_status|          tweet_text|hashtags|retweet_count|

## 4. Save filtered and selected features data

In [None]:
tweets_filt.write.format("parquet").\
mode('overwrite').\
save('gs://prayutjain-tweet-bucket/filtered_data')

df.write.format("parquet").\
mode('overwrite').\
save('gs://prayutjain-tweet-bucket/processed_data')

22/12/05 20:35:29 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 35 for reason Container marked as failed: container_1670262909868_0003_01_000035 on host: hub-msca-bdp-dphub-students-backup-prayutjain-sw-wd0p.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
22/12/05 20:35:29 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 31 for reason Container marked as failed: container_1670262909868_0003_01_000031 on host: hub-msca-bdp-dphub-students-backup-prayutjain-sw-wd0p.c.msca-bdp-students.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
22/12/05 20:36:13 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1670262909868_0003_01_000043 on host: hub-msca-bdp-dphub-students-backup-prayutjain-sw-587z.c.msca-bdp-students.internal. Exit statu