In [1]:
import sys
print(sys.version)
print(spark.version)

3.8.15 | packaged by conda-forge | (default, Nov 22 2022, 08:46:39) 
[GCC 10.4.0]
3.1.3


In [2]:
import pandas as pd
import numpy as np
pd.set_option('display.max_colwidth', None)
pd.reset_option('display.max_rows')
from itertools import compress 
from pyspark.sql.functions import *
from pyspark.sql.types import *
import seaborn as sns
import matplotlib.pyplot as plt
warnings.filterwarnings(action='ignore')

# to beautify the spark dataframe
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

### Load the Data

In [3]:
!hadoop fs -ls 'gs://msca-bdp-tweets/final_project/' | head 

Found 50696 items
-rwx------   3 root root          0 2023-02-08 13:58 gs://msca-bdp-tweets/final_project/_SUCCESS
-rwx------   3 root root    4500466 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00000-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4107431 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00001-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4672123 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00002-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    5186684 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00003-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4729662 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00004-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx------   3 root root    4605529 2023-02-08 13:44 gs://msca-bdp-tweets/final_project/part-00005-aa6d3cb4-7022-4df2-9921-218307589ce2-c000.json
-rwx-----

We have data spread over 50696 json files

In [4]:
# Reading data from the bucket
bucket_name = 'msca-bdp-tweets'
folder_name = 'final_project'

# load the data into spark dataframe
tweets_raw = spark.read.json('gs://' + bucket_name + '/' + folder_name)

# count the number of records
print('Count of records:', tweets_raw.count())

# display 5 records
tweets_raw.limit(5)

23/03/04 03:47:33 WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
23/03/04 03:53:37 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Count of records: 99994342


                                                                                

coordinates,created_at,display_text_range,entities,extended_entities,extended_tweet,favorite_count,favorited,filter_level,geo,id,id_str,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_status_id_str,in_reply_to_user_id,in_reply_to_user_id_str,is_quote_status,lang,place,possibly_sensitive,quote_count,quoted_status,quoted_status_id,quoted_status_id_str,quoted_status_permalink,quoted_text,reply_count,retweet_count,retweeted,retweeted_from,retweeted_status,source,text,timestamp_ms,truncated,tweet_text,user,withheld_in_countries
,Wed May 25 00:46:...,,"{[], null, [], []...",,,0,False,low,,1529262624952569856,1529262624952569856,,,,,,False,en,,,0,,,,,,0,0,RT,nicolesjchung,"{null, Tue May 24...","<a href=""http://t...",RT @nicolesjchung...,1653439596771,False,it's school. they...,"{false, Thu Dec 0...",
,Wed May 25 00:46:...,,"{[], null, [], []...",,,0,False,low,,1529262625057607680,1529262625057607680,,,,,,False,en,,,0,,,,,,0,0,RT,KingJames,"{null, Tue May 24...","<a href=""http://t...",RT @KingJames: My...,1653439596796,False,My thoughts and p...,"{false, Thu Oct 1...",
,Wed May 25 00:46:...,"[27, 122]","{[], null, [], []...",,,0,False,low,,1529262625099419648,1529262625099419648,SaraIsaacson4,1.5292059121829683e+18,1.5292059121829683e+18,1.1129219300159567e+18,1.1129219300159567e+18,False,en,"{{[[[-105.053666,...",,0,,,,,,0,0,,Leahgreenb I am m...,,"<a href=""http://t...",@SaraIsaacson4 @L...,1653439596806,False,@SaraIsaacson4 @L...,"{false, Sun Feb 2...",
,Wed May 25 00:46:...,,"{[], null, [], []...",,,0,False,low,,1529262625296613376,1529262625296613376,,,,,,False,en,,,0,,,,,,0,0,RT,priore_ashley,"{null, Wed May 25...","<a href=""http://t...",RT @priore_ashley...,1653439596853,False,Gun violence epid...,"{false, Tue Aug 0...",
,Wed May 25 00:46:...,,"{[], null, [], [{...",,"{[0, 253], {[], n...",0,False,low,,1529262625166708739,1529262625166708739,,,,,,False,en,,False,0,,,,,,0,0,,,,"<a href=""http://t...",WATCH LIVE: Presi...,1653439596822,True,WATCH LIVE: Presi...,"{false, Sat Oct 2...",


The original data has 99994342 tweets which is approximately ~100M.

### Keep only english tweets and tweets related to K-12 education

In [5]:
# keep only english tweets
tweets_en = tweets_raw.filter(tweets_raw.lang == 'en')

In [6]:
# Remove non-English characters, convert to lowercase and remove whitespace
tweets_en = tweets_en\
    .withColumn("tweet_text", regexp_replace("tweet_text", "[^a-zA-Z0-9\\s]", ""))\
    .withColumn("tweet_text", lower("tweet_text"))\
    .withColumn("tweet_text", regexp_replace("tweet_text", "\\s+", " "))

In [9]:
# define the keywords to include
keywords = ["education", "school", "university", "learning", "knowledge", "kindergarten", "12th grade", "primary education", "secondary education", "college", "k-12", 
            "classroom", "middle school", "preschool", "class","degree", "marksheet", "curriculum", "tuition", "scholarship", "junior school","senior school", "academic", 
            "primary school", "elementary school", "high school", "syllabus", "assessment", "tutoring", "stem education", "vocational education","homework", "online learning",
            "pedagogy", "classwork", "textbook", "e-learning", "report cards", "public school", "private school", "teacher", "student", "parent-teacher conferences", "school counselling",
            "school board", "literacy", "sophomore", "educational institution", "curricula", "twelfth grade", "first grade", "second grade", "seventh grade", "fifth grade", "ninth grade",
            "fourth grade", "eleventh grade", "third grade", "eighth grade", "sixth grade", "tenth grade", "educational"]

# keep the tweets related to k-12
filtered_tweets = tweets_en.filter(col('text').rlike('|'.join(keywords)))

# check the record count
filtered_tweets.count()

                                                                                

58482831

### Save the filtered data

In [None]:
filtered_tweets.write.format("parquet").\
mode('overwrite').\
save('gs://msca-bdp-students-bucket/shared_data/sshende/filtered_data')

                                                                                

In [1]:
# load the filtered data into spark dataframe
tweets = spark.read.parquet('gs://msca-bdp-students-bucket/shared_data/sshende/filtered_data')

23/03/06 07:47:58 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [2]:
tweets.printSchema()

root
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: boolean (nullable = true)
 |    |    |    |   

### Keeping only the features required for the analysis

In [10]:
processed_tweets = tweets.select('created_at', 'text', 'user.id', 'user.id_str', 'user.screen_name', 'user.description', 'user.verified', 'place.country', 'place.bounding_box',
                               'quote_count','reply_count','retweeted_status.retweet_count','favorite_count','user.followers_count', 'user.statuses_count', 
                               'retweeted_status', 'quoted_status')
processed_tweets

                                                                                

created_at,text,id,id_str,screen_name,description,verified,country,bounding_box,quote_count,reply_count,retweet_count,favorite_count,followers_count,statuses_count,retweeted_status,quoted_status
Mon May 23 21:32:...,RT @imoteda: Hell...,126655602,126655602,TheMonijesu,Founder : @cradle...,False,,,0,0,77.0,0,1134,62091,"{null, Mon May 23...",
Mon May 23 21:32:...,RT @MPTigerPride:...,808774246654705664,808774246654705664,Coach_Crosby_44,Head Football Coa...,False,,,0,0,4.0,0,843,3204,"{null, Mon May 23...",
Mon May 23 21:32:...,Emotional well-be...,100607528,100607528,Revcynthia,Author + Speaker ...,False,,,0,0,,0,3523,9185,,
Mon May 23 21:32:...,RT @JDCocchiarell...,303083124,303083124,AldojaKush,"Ask about me, Lea...",False,,,0,0,5731.0,0,261,59905,"{null, Sun May 22...",
Mon May 23 21:32:...,@laurenboebert If...,292456757,292456757,jrislate,#fightfordemocracy,False,,,0,0,,0,15849,21970,,
Mon May 23 21:32:...,Large percentage ...,1050650108285472768,1050650108285472768,AntaraEnglish,English news serv...,False,,,0,0,,0,1299,32184,,
Mon May 23 21:32:...,RT @ilovemypets56...,1480926799500619776,1480926799500619776,Coupdetat231,#FBPE. Pro EU. Se...,False,,,0,0,490.0,0,250,3177,"{null, Mon May 23...",
Mon May 23 21:32:...,need to sleep bec...,1281047731822354433,1281047731822354433,K00_CORE,currently busy wi...,False,,,0,0,,0,3097,58866,,
Mon May 23 21:32:...,RT @SusanPersever...,3262844619,3262844619,SuzanneSpsjess,"Wife, mom, teache...",False,,,0,0,9987.0,0,11097,138473,"{null, Mon May 23...",
Mon May 23 21:32:...,RT @hoejackboresm...,392804683,392804683,thegoodkid99,A Kendrick Lamar ...,False,,,0,0,9354.0,0,446,44152,"{null, Sun May 22...","{null, Sat May 21..."


In [11]:
print("Number of rows:", processed_tweets.count())
print("Number of columns:", len(processed_tweets.columns))



Number of rows: 58482831
Number of columns: 17


                                                                                

Now, the processed data has 58482831 tweets and 17 features.

### Save the processed data

In [12]:
processed_tweets.write.format("parquet").\
mode('overwrite').\
save('gs://msca-bdp-students-bucket/shared_data/sshende/processed_data')

                                                                                

We will use this data for further analysis.