In [1]:
import os
import subprocess
from pyspark.sql.functions import *
from pyspark.sql.types import *

import shutil
import pandas as pd

pd.set_option("max_colwidth", 100)
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

from google.cloud import storage

# Import & Inspecting Data

In [2]:
directory = 'gs://msca-bdp-tweets/final_project/'
path = directory

In [3]:
cmd = 'hadoop fs -du -s -h ' + directory

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (line)

retval = p.wait()

156.2 G  156.2 G  gs://msca-bdp-tweets/final_project



In [3]:
%%time

tweets_df = spark.read.json(path)

CPU times: user 36.5 ms, sys: 18.6 ms, total: 55.1 ms
Wall time: 4min 13s


In [5]:
# %%time

# tweets_df.count()

In [6]:
# tweets_df.printSchema()

In [8]:
tweets_df.columns

['contributors',
 'coordinates',
 'created_at',
 'display_text_range',
 'entities',
 'extended_entities',
 'extended_tweet',
 'favorite_count',
 'favorited',
 'filter_level',
 'geo',
 'id',
 'id_str',
 'in_reply_to_screen_name',
 'in_reply_to_status_id',
 'in_reply_to_status_id_str',
 'in_reply_to_user_id',
 'in_reply_to_user_id_str',
 'is_quote_status',
 'lang',
 'place',
 'possibly_sensitive',
 'quote_count',
 'quoted_status',
 'quoted_status_id',
 'quoted_status_id_str',
 'quoted_status_permalink',
 'reply_count',
 'retweet_count',
 'retweeted',
 'retweeted_status',
 'source',
 'text',
 'timestamp_ms',
 'truncated',
 'user',
 'withheld_in_countries']

In [26]:
# %%time

# tweets_df.select([count(when(col(c).isNull(), c)).alias(c) for c in tweets_df.columns]).show(vertical=True)

In [None]:
# tweets_df.show(1, vertical=True)

# Cleaning Data
#### - Remove the rows with very high null counts
#### - Removing other cols that shouldnt influence analysis
#### - Merge extended_tweet Fields (no need for truncated anymore)
#### - Looking into schema and removing duplicated fields (data that is both in top level and contained in strucs)
#### - Split timestamp column into separate fields and drop

In [9]:
drop_cols = ('contributors','coordinates','display_text_range','extended_entities','withheld_in_countries',\
             'lang','possibly_sensitive','in_reply_to_screen_name','in_reply_to_status_id_str','favorited',\
             'in_reply_to_user_id_str','quoted_status_id_str','quoted_status_permalink','source','id_str')

tweets_df1 = tweets_df.drop(*drop_cols)

tweets_df2 = tweets_df1.withColumn("twt_text_full",when((col("truncated") == True) & (col("extended_tweet").isNotNull() == True), col('extended_tweet.full_text')).otherwise(col("text")))\
                       .withColumn("twt_hashtags",when((col("truncated") == True) & (col("extended_tweet").isNotNull() == True), col('extended_tweet.entities.hashtags.text')).otherwise(col("entities.hashtags.text")))\
                       .withColumn("twt_country", col("place.country_code"))\
                       .withColumn("twt_location_full", col("place.full_name"))\
                       .withColumn("twt_location", col("place.name"))\
                       .withColumn("twt_location_type", col("place.place_type"))\
                       .withColumn("ext_rt_id",col('retweeted_status.id_str'))\
                       .withColumn("ext_rt_user_id",col('retweeted_status.user.id'))\
                       .withColumn("ext_qt_user_id",col('quoted_status.user.id'))\
                       .withColumn("usr_id", col("user.id"))\
                       .withColumn("usr_name", col("user.screen_name"))\
                       .withColumn("usr_location", col("user.location"))\
                       .withColumn("usr_followers", col("user.followers_count"))\
                       .withColumn("usr_tweet_count",col('user.statuses_count'))\
                       .withColumn("usr_verified",col('user.verified'))\
                       .withColumn("usr_desc",col('user.description'))\
                       .withColumn("created_timestamp",from_unixtime(round(col("timestamp_ms")/1000)))

tweets_df2 = tweets_df2.withColumn("dt_year", year(col("created_timestamp")))\
                       .withColumn("dt_month", month(col("created_timestamp")))\
                       .withColumn("dt_day", dayofmonth(col("created_timestamp")))\
                       .withColumn("dt_hour", hour(col("created_timestamp")))\
                       .withColumn("dt_date", to_date(col("created_timestamp")))\
                       .withColumn("dt_datehour",date_trunc("hour", col("created_timestamp")))

drop_cols = ('entities','extended_tweet','geo','place','quoted_status','retweeted_status','user','text',\
             'truncated','is_quote_status','retweeted','timestamp_ms','created_at'\
             'reply_count','retweet_count','quote_count','created_timestamp')

tweets_df3 = tweets_df2.drop(*drop_cols)


In [10]:
tweets_df3 = tweets_df3.withColumnRenamed("in_reply_to_status_id","ext_rp_id")\
                       .withColumnRenamed("quoted_status_id","ext_qt_id")\
                       .withColumnRenamed("id","twt_id")\
                       .withColumnRenamed("filter_level","twt_importance")\
                       .withColumnRenamed("favorite_count","twt_likes")\
                       .withColumnRenamed("in_reply_to_user_id","ext_rp_user_id")

#### Retwt, reply, and quote counts are 0 since the tweets are grabbed at the time of api
#### going to make a dataframe of counts of retweeted/quoted/replied tweets and merge with the original dataframe

In [11]:
rp_df = tweets_df3.groupBy('ext_rp_id').agg(count('*').alias('twt_reply_count'))
rt_df = tweets_df3.groupBy('ext_rt_id').agg(count('*').alias('twt_retwt_count'))
qt_df = tweets_df3.groupBy('ext_qt_id').agg(count('*').alias('twt_quote_count'))

tweets_df3 = tweets_df3.join(rp_df.withColumnRenamed('ext_rp_id','twt_id'), ['twt_id'], how='left')
tweets_df3 = tweets_df3.join(rt_df.withColumnRenamed('ext_rt_id','twt_id'), ['twt_id'], how='left')
tweets_df3 = tweets_df3.join(qt_df.withColumnRenamed('ext_qt_id','twt_id'), ['twt_id'], how='left')

tweets_df3 = tweets_df3.na.fill({"twt_reply_count": 0,
                                 "twt_retwt_count": 0,
                                 "twt_quote_count": 0})

# Finding Relevant Tweets

### 

In [12]:
keywords = ['corona', 'covid', 'sars-cov-2', 'viral', 'wuhan virus', 'china virus',
            'delta', 'omicron', 'variant', 'vaccin','booster',
            'moderna','niaid','pfizer','biontech','johnson','j&j',
            'sinovac','novavax','az','astrazeneca','janssen', 'comirnaty',
           ]

In [13]:
tweets_df_relevant = tweets_df3.select("*", lower(col('twt_text_full')).alias('twt_text'))

tweets_df_relevant = tweets_df_relevant.where(tweets_df_relevant['twt_text'].rlike("|".join(["(" + kw + ")" for kw in keywords])))

tweets_df_relevant = tweets_df_relevant.drop('twt_text_full')

tweets_df_relevant = tweets_df_relevant.withColumn("is_original",when(((col("ext_qt_id").isNull()) & 
                                                                        (col("ext_rt_id").isNull()) & 
                                                                        (col("ext_rp_id").isNull())), 1).otherwise(0))

tweets_df_relevant = tweets_df_relevant.select(sorted(tweets_df_relevant.columns))

# intial_relevant_count = tweets_df_relevant.count()
# print(intial_relevant_count)

In [14]:
tweets_df_remaining = tweets_df3.select("*", lower(col('twt_text_full')).alias('twt_text'))

tweets_df_remaining = tweets_df_remaining.where(~tweets_df_remaining['twt_text'].rlike("|".join(["(" + kw + ")" for kw in keywords])))

tweets_df_remaining = tweets_df_remaining.drop('twt_text_full')

tweets_df_remaining = tweets_df_remaining.withColumn("is_original", lit(0))

tweets_df_remaining = tweets_df_remaining.select(sorted(tweets_df_relevant.columns))

# remaining_count = tweets_df_remaining.count()
# print(remaining_count)

In [15]:
tweets_df_rts = tweets_df_remaining.join(tweets_df_relevant.withColumn("ext_rt_id", col("twt_id")), ['ext_rt_id'], 'leftsemi')\
                                   .withColumn("is_original", lit(0))\
                                   .select(sorted(tweets_df_relevant.columns))

# relevant_retweet_count = tweets_df_rts.count()
# print(relevant_retweet_count)

In [16]:
tweets_df_qts = tweets_df_remaining.join(tweets_df_relevant.withColumn("ext_qt_id", col("twt_id")), ['ext_qt_id'], 'leftsemi')\
                                   .withColumn("is_original", lit(0))\
                                   .select(sorted(tweets_df_relevant.columns))

# relevant_quote_count = tweets_df_qts.count()
# print(relevant_quote_count)

In [17]:
tweets_df_rps1 = tweets_df_remaining.join(tweets_df_relevant.withColumn("ext_rp_id", col("twt_id")), ['ext_rp_id'], 'leftsemi')\
                                    .withColumn("is_original", lit(0))\
                                    .select(sorted(tweets_df_relevant.columns))

# relevant_reply_count1 = tweets_df_rps1.count()
# print(relevant_reply_count1)

In [18]:
tweets_df_rps2 = tweets_df_remaining.join(tweets_df_rps1.withColumn("ext_rp_id", col("twt_id")), ['ext_rp_id'], 'leftsemi')\
                                    .withColumn("is_original", lit(0))\
                                    .select(sorted(tweets_df_relevant.columns))

# relevant_reply_count2 = tweets_df_rps2.count()
# print(relevant_reply_count2)

In [19]:
tweets_df_rps3 = tweets_df_remaining.join(tweets_df_rps2.withColumn("ext_rp_id", col("twt_id")), ['ext_rp_id'], 'leftsemi')\
                                    .withColumn("is_original", lit(0))\
                                    .select(sorted(tweets_df_relevant.columns))

# relevant_reply_count3 = tweets_df_rps3.count()
# print(relevant_reply_count3)

In [20]:
tweets_df_relevant = tweets_df_relevant.union(tweets_df_rts)\
                                       .union(tweets_df_qts)\
                                       .union(tweets_df_rps1)\
                                       .union(tweets_df_rps2)\
                                       .union(tweets_df_rps3)

tweets_df_relevant = tweets_df_relevant.drop_duplicates()

In [21]:
drop_cols = ('ext_qt_id','ext_rp_id','ext_rp_id')

tweets_df_clean = tweets_df_relevant.drop(*drop_cols)

In [22]:
tweets_df_clean.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- dt_date: date (nullable = true)
 |-- dt_datehour: timestamp (nullable = true)
 |-- dt_day: integer (nullable = true)
 |-- dt_hour: integer (nullable = true)
 |-- dt_month: integer (nullable = true)
 |-- dt_year: integer (nullable = true)
 |-- ext_qt_user_id: long (nullable = true)
 |-- ext_rp_user_id: long (nullable = true)
 |-- ext_rt_id: string (nullable = true)
 |-- ext_rt_user_id: long (nullable = true)
 |-- is_original: integer (nullable = false)
 |-- reply_count: long (nullable = true)
 |-- twt_country: string (nullable = true)
 |-- twt_hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- twt_id: long (nullable = true)
 |-- twt_importance: string (nullable = true)
 |-- twt_likes: long (nullable = true)
 |-- twt_location: string (nullable = true)
 |-- twt_location_full: string (nullable = true)
 |-- twt_location_type: string (nullable = true)
 |-- twt_quote_count: long (nullable = false)
 |-- twt

# Saving to GCS Bucket

In [23]:
%%time

# Delete folder from COS bucket
def delete_folder(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))

    for blob in blobs:
        blob.delete()

delete_folder('msca-bdp-students-bucket', 'shared_data/kaihayden/final1')


CPU times: user 555 ms, sys: 41.5 ms, total: 596 ms
Wall time: 8.21 s


In [None]:
%%time

tweets_df_clean.write.format("parquet")\
               .mode('overwrite')\
               .save('gs://msca-bdp-students-bucket/shared_data/kaihayden/final1')

CPU times: user 422 ms, sys: 184 ms, total: 606 ms
Wall time: 58min 13s


In [27]:
%%time

tweets_df.select([count(when(col(c).isNull(), c)).alias(c) for c in tweets_df.columns]).show(vertical=True)

-RECORD 0-----------------------------
 contributors              | 25191000 
 coordinates               | 25188865 
 created_at                | 0        
 display_text_range        | 21497511 
 entities                  | 0        
 extended_entities         | 24418633 
 extended_tweet            | 20859238 
 favorite_count            | 0        
 favorited                 | 0        
 filter_level              | 0        
 geo                       | 25188865 
 id                        | 0        
 id_str                    | 0        
 in_reply_to_screen_name   | 21858171 
 in_reply_to_status_id     | 22009299 
 in_reply_to_status_id_str | 22009299 
 in_reply_to_user_id       | 21858171 
 in_reply_to_user_id_str   | 21858171 
 is_quote_status           | 0        
 lang                      | 0        
 place                     | 25051534 
 possibly_sensitive        | 20548189 
 quote_count               | 0        
 quoted_status             | 19958447 
 quoted_status_id        