### AT3 SINA
### Extract the tweets from the data sources


1. Get the original dataset from https://data.mendeley.com/datasets/7ph4nx8hnc/1 (this link can be found at the last section of the paper: https://www.sciencedirect.com/science/article/pii/S2352340920305783) and store them into a folder (I'm using sina as the folder name)
2. Run a docker container with SPARK
3. Run this notebook to get 5 files:
   a. list of tweet ID (covid_tweets.csv) --> make sure that the column name is tweet_id
   b. list of edges (network) based on mention (edge_mention.csv)
   c. list of edges (network) based on retweet (edge_retweet.csv)
   d. list of edges (network) based on reply (edge_reply.csv)
   e. list of edges (network) based on the combination of mention, retweet, and reply (edge_all.csv)


(Execute the hydrate script below to donwload the tweets) 
#### Hydrate
1. Create a twitter API key from https://developer.twitter.com/en/apps by creating an apps
2. Copy the consumer_key, consumer_secret, access_token, and access_token_secret into api_keys.json
3. Download get_metadata.py from https://github.com/thepanacealab/SMMT/tree/master/data_acquisition and save it into the same folder with covid_tweets.csv
4. Open command prompt/terminal and run 
python get_metadata.py -i covid_tweets.csv -o hydrated_covid_tweets -k api_keys.json


In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import IntegerType

In [2]:
spark_sina = SparkSession\
                .builder\
                .appName('SINA') \
                .master("local[*]") \
                .getOrCreate()

## Data Load

In [6]:
#list of tweets id
df_tweet_date_lang = \
                spark_sina\
                .read \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .csv("sina/01.Tw.Date.Lang.csv")

In [3]:
#list of relationship between nodes
df_tweet_edges = \
                spark_sina\
                .read \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .csv("sina/07a.Tw.edges.csv")

In [4]:
#list of nodes
df_tweet_nodes = \
                spark_sina\
                .read \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .csv("sina/07b.Tw.nodes.csv")

In [16]:
df_tweet_date_lang.printSchema()

root
 |-- status_id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- lang: string (nullable = true)



In [25]:
df_tweet_edges.printSchema()

root
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- status_id: string (nullable = true)
 |-- width: integer (nullable = true)



In [26]:
df_tweet_nodes.printSchema()

root
 |-- name: string (nullable = true)



In [27]:
df_tweet_date_lang.show()

+--------------------+-------------------+----+
|           status_id|         created_at|lang|
+--------------------+-------------------+----+
|x1216747185565507585|2020-01-13 15:41:49|  en|
|x1216754859292184577|2020-01-13 16:12:19|  en|
|x1216763045181034497|2020-01-13 16:44:50|  en|
|x1216809242885267456|2020-01-13 19:48:25|  en|
|x1216830795672948741|2020-01-13 21:14:03|  en|
|x1216936785617416192|2020-01-14 04:15:13|  ja|
|x1216955602607509505|2020-01-14 05:30:00|  en|
|x1216964926226518022|2020-01-14 06:07:03|  en|
|x1216991240115245057|2020-01-14 07:51:36|  en|
|x1217027488137826304|2020-01-14 10:15:39|  zh|
|x1217038653194674176|2020-01-14 11:00:01|  en|
|x1217043229427761152|2020-01-14 11:18:12|  en|
|x1217054498994024448|2020-01-14 12:02:58|  en|
|x1217087723116355585|2020-01-14 14:15:00|  en|
|x1217097034500657152|2020-01-14 14:52:00|  es|
|x1217105359044673536|2020-01-14 15:25:04|  zh|
|x1217118941434142721|2020-01-14 16:19:03|  en|
|x1217126105506820096|2020-01-14 16:47:3

In [28]:
df_tweet_edges.show()

+--------------+---------------+-----+--------------------+-----+
|          from|             to| Type|           status_id|width|
+--------------+---------------+-----+--------------------+-----+
|  DrNancyM_CDC|         CDCgov|2. MT|x1216747185565507585|    1|
|           WHO|        WHOWPRO|4. Re|x1217027488137826304|    1|
|   CDCDirector|         CDCgov|2. MT|x1217126105506820096|    1|
|           WHO|        pr_moph|2. MT|x1217151178884222976|    1|
|HelenBranswell|            WHO|2. MT|x1217191264858206209|    1|
|            UN|            WHO|2. MT|x1217265645529837568|    1|
|     QuickTake|       business|2. MT|x1217741070643027969|    1|
|           WHO|    WHOThailand|4. Re|x1217787666072293376|    1|
|           WHO|        WHOKobe|4. Re|x1217787666072293376|    1|
|           WHO|       WHOSEARO|4. Re|x1217787666072293376|    1|
|           WHO|        WHOWPRO|4. Re|x1217787666072293376|    1|
|           WHO|      MHLWitter|2. MT|x1217898691375370242|    1|
|      mar

In [7]:
#get the latest date available
df_tweet_date_lang.groupBy()\
        .agg(F.min(F.col('created_at')).alias('min_date'),
             F.max(F.col('created_at')).alias('max_date'))\
        .show()

+-------------------+-------------------+
|           min_date|           max_date|
+-------------------+-------------------+
|2020-01-13 15:41:49|2020-02-12 23:59:59|
+-------------------+-------------------+



In [9]:
#get the latest data only (2 days)
df_tweet_filtered = \
        df_tweet_date_lang.filter((F.col('lang') == 'en') & 
                                  (F.col('created_at') >= '2020-02-11 00:00:00') &
                                  (F.col('created_at') <= '2020-02-12 23:59:59'))

In [10]:
df_tweet_filtered.count()

582832

In [31]:
df_tweet_filtered.show()

+--------------------+-------------------+----+
|           status_id|         created_at|lang|
+--------------------+-------------------+----+
|x1226657027260370944|2020-02-10 00:00:00|  en|
|x1226657027499450368|2020-02-10 00:00:00|  en|
|x1226657027587530752|2020-02-10 00:00:00|  en|
|x1226657027717509120|2020-02-10 00:00:00|  en|
|x1226657027721695232|2020-02-10 00:00:00|  en|
|x1226657027730046977|2020-02-10 00:00:00|  en|
|x1226657028258619392|2020-02-10 00:00:00|  en|
|x1226657028325748737|2020-02-10 00:00:00|  en|
|x1226657028501843968|2020-02-10 00:00:00|  en|
|x1226657029172817920|2020-02-10 00:00:00|  en|
|x1226657029172932608|2020-02-10 00:00:00|  en|
|x1226657029298716680|2020-02-10 00:00:00|  en|
|x1226657029323972608|2020-02-10 00:00:00|  en|
|x1226657029831434240|2020-02-10 00:00:00|  en|
|x1226657030129246208|2020-02-10 00:00:00|  en|
|x1226657030271832065|2020-02-10 00:00:00|  en|
|x1226657030481620992|2020-02-10 00:00:00|  en|
|x1226657030531883008|2020-02-10 00:00:0

In [13]:
#get the edges for the latest tweets only

df_tweet_edges_filtered = df_tweet_edges.join(df_tweet_filtered, on = 'status_id')

In [14]:
df_tweet_edges_filtered.count()

529608

In [55]:
df_tweet_edges_filtered.show()

+--------------------+---------------+--------------+-----+-----+-------------------+----+
|           status_id|           from|            to| Type|width|         created_at|lang|
+--------------------+---------------+--------------+-----+-----+-------------------+----+
|x1227019622374789120|       bsumit83|     AskAnshul|3. RT|    1|2020-02-11 00:00:49|  en|
|x1227019670852472832| TruthJusticeRt|  viamediainfo|3. RT|    1|2020-02-11 00:01:01|  en|
|x1227019692100870144|     Be_Kinderr| ChrisMurphyCT|3. RT|    1|2020-02-11 00:01:06|  en|
|x1227022879977607168|   thinkitsayit| SenanGeraghty|4. Re|    1|2020-02-11 00:13:46|  en|
|x1227023277224271872|  JimDavisOnAir|BBCRadioLondon|2. MT|    1|2020-02-11 00:15:20|  en|
|x1227024226844135424|        Shugy06|       BNODesk|3. RT|    1|2020-02-11 00:19:07|  en|
|x1227024262483075072|      SWIMKILLS|     RANsquawk|3. RT|    1|2020-02-11 00:19:15|  en|
|x1227024330753634305|I_Doesnt_Care_U|      IsChinar|3. RT|    1|2020-02-11 00:19:32|  en|

In [56]:
df_tweet_edges_filtered.select('Type').distinct().collect()

[Row(Type='2. MT'), Row(Type='4. Re'), Row(Type='3. RT')]

In [11]:
#remove prefix X from status id
df_tweet_filtered = df_tweet_filtered\
        .withColumn('tweet_id',F.regexp_replace(F.col('status_id'), 'x', ''))

In [66]:
df_tweet_filtered.show()

+--------------------+-------------------+----+-------------------+
|           status_id|         created_at|lang|           tweet_id|
+--------------------+-------------------+----+-------------------+
|x1227019414584709120|2020-02-11 00:00:00|  en|1227019414584709120|
|x1227019414739931136|2020-02-11 00:00:00|  en|1227019414739931136|
|x1227019414882570240|2020-02-11 00:00:00|  en|1227019414882570240|
|x1227019415520083968|2020-02-11 00:00:00|  en|1227019415520083968|
|x1227019415612542977|2020-02-11 00:00:00|  en|1227019415612542977|
|x1227019415650066436|2020-02-11 00:00:00|  en|1227019415650066436|
|x1227019415700561920|2020-02-11 00:00:00|  en|1227019415700561920|
|x1227019415708819457|2020-02-11 00:00:00|  en|1227019415708819457|
|x1227019415746682882|2020-02-11 00:00:00|  en|1227019415746682882|
|x1227019415859978240|2020-02-11 00:00:00|  en|1227019415859978240|
|x1227019415960608768|2020-02-11 00:00:00|  en|1227019415960608768|
|x1227019416388263936|2020-02-11 00:00:00|  en|1

In [68]:
#save to CSV
#tweet
df_tweet_filtered.select(F.col('tweet_id'))\
                .write.option("header", "true")\
                .csv('covid_tweets.csv')

In [70]:
#edges - based on mention
df_tweet_edges_filtered.filter(F.col('Type') == '2. MT')\
        .select(F.col('from'), F.col('to'))\
        .repartition(1).write\
        .csv('edge_mention.csv')

In [71]:
#edges - based on retweet
df_tweet_edges_filtered.filter(F.col('Type') == '3. RT')\
        .select(F.col('from'), F.col('to'))\
        .repartition(1).write\
        .csv('edge_retweet.csv')

In [72]:
#edges - based on reply
df_tweet_edges_filtered.filter(F.col('Type') == '4. Re')\
        .select(F.col('from'), F.col('to'))\
        .repartition(1).write\
        .csv('edge_reply.csv')

In [73]:
#edges - all
df_tweet_edges_filtered.select(F.col('from'), F.col('to'))\
        .repartition(1).write\
        .csv('edge_all.csv')

In [47]:
df_tweet_edges_filtered.filter(F.col('to') == 'GordonGChang').show()

+--------------------+---------------+------------+-----+-----+-------------------+----+-------------------+
|           status_id|           from|          to| Type|width|         created_at|lang|           tweet_id|
+--------------------+---------------+------------+-----+-----+-------------------+----+-------------------+
|x1227632511041359872|EdwardGordon_M1|GordonGChang|3. RT|    1|2020-02-12 16:36:13|  en|1227632511041359872|
|x1227638030829326336|ChrisRecker_AZA|GordonGChang|3. RT|    1|2020-02-12 16:58:09|  en|1227638030829326336|
|x1227025538344247297|  Right_Realist|GordonGChang|3. RT|    1|2020-02-11 00:24:20|  en|1227025538344247297|
|x1227627759393837056|  Bondy82422070|GordonGChang|3. RT|    1|2020-02-12 16:17:20|  en|1227627759393837056|
|x1227668307043659776|      sport5105|GordonGChang|3. RT|    1|2020-02-12 18:58:28|  en|1227668307043659776|
|x1227653845469290496|   GhalipNadira|GordonGChang|3. RT|    1|2020-02-12 18:01:00|  en|1227653845469290496|
|x12277125230020526