In [1]:
!pip install -r requirements.txt

Collecting pyspark (from -r requirements.txt (line 1))
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:03[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting kaggle (from -r requirements.txt (line 2))
  Downloading kaggle-1.6.6.tar.gz (84 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.6/84.6 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m-:--:--[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark->-r requirements.txt (line 1)

In [16]:
import json
import os
import zipfile
import subprocess
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, count, last, lag, row_number, substring_index, input_file_name, regexp_extract, expr, concat_ws, map_from_entries, explode, struct
from pyspark.sql.window import Window

In [4]:
def init_on_kaggle(username, api_key):
    KAGGLE_CONFIG_DIR = os.path.join(os.path.expandvars('$HOME'), '.kaggle')
    os.makedirs(KAGGLE_CONFIG_DIR, exist_ok = True)
    api_dict = {
        "username": username, 
        "key": api_key
    } 
    with open(f"{KAGGLE_CONFIG_DIR}/kaggle.json", "w", encoding='utf-8') as f:
        json.dump(api_dict, f)
    cmd = f"chmod 600 {KAGGLE_CONFIG_DIR}/kaggle.json"
    output = subprocess.check_output(cmd.split(" "))
    output = output.decode(encoding='UTF-8')
    print(output)

init_on_kaggle('testexerciseuser', '1ab7db4952d8bca38e7357e5d4dbdd35')




In [14]:
import kaggle

datasets = kaggle.api.dataset_list(search="coronavirus-covid19-tweets-early-april")

kaggle.api.dataset_download_files(vars(datasets[0])['ref'], path="csv/")

with zipfile.ZipFile(f'csv/{vars(datasets[0])["ref"].split("/")[1]}.zip', 'r') as zip_ref:
    zip_ref.extractall('csv')




In [2]:
sp = SparkSession.builder.getOrCreate()

24/03/25 01:37:08 WARN Utils: Your hostname, kWs-MacBook-Pro-2019.local resolves to a loopback address: 127.0.0.1; using 192.168.1.205 instead (on interface en0)
24/03/25 01:37:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/25 01:37:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
src_df = sp.read\
   .option("escapeQuotes", "true")\
   .option("multiLine", "true")\
   .csv(
      'csv/2020-03-29 Coronavirus Tweets.CSV', 
      header=True,
      quote='"',
      escape='\"',
   )\
   .withColumn(
      "tweet_date", 
      regexp_extract(input_file_name(), "/(\d{4}-\d{2}-\d{2})", 1)
   )\
   .withColumn('hashtags', expr(r"regexp_extract_all(text, '(#\\w+)', 0)") )

src_df.show()

                                                                                

+-------------------+-------------------+--------------------+---------------+--------------------+--------------------+------------------+----------------+--------------------+--------+----------+----------------+-------------+------------+---------------+----------+---------------+-------------+------------+--------------------+--------+----+----------+--------------------+
|          status_id|            user_id|          created_at|    screen_name|                text|              source|reply_to_status_id|reply_to_user_id|reply_to_screen_name|is_quote|is_retweet|favourites_count|retweet_count|country_code|place_full_name|place_type|followers_count|friends_count|account_lang|  account_created_at|verified|lang|tweet_date|            hashtags|
+-------------------+-------------------+--------------------+---------------+--------------------+--------------------+------------------+----------------+--------------------+--------+----------+----------------+-------------+------------+-

In [100]:
# q1

derived_df1 = src_df.select(
   "tweet_date",
   "user_id",
   "hashtags",
   "source",
   "screen_name",
   "created_at",
).withColumn(
   "screen_name", 
   last("screen_name")\
      .over(Window.partitionBy("user_id", "tweet_date").orderBy(col("created_at").desc()))
)

derived_df1.show()

+----------+-------------------+--------------------+--------------------+---------------+--------------------+
|tweet_date|            user_id|            hashtags|              source|    screen_name|          created_at|
+----------+-------------------+--------------------+--------------------+---------------+--------------------+
|2020-03-29| 860252856829587457|[#Prevenci, #Coro...|           TweetDeck|   IMSS_SanLuis|2020-03-29T00:00:00Z|
|2020-03-29|1125933654943895553|[#ATENCI, #Corona...|           TweetDeck|     intrac_ccs|2020-03-29T00:00:00Z|
|2020-03-29|           80943559|[#minneapolis, #m...|           TweetDeck|       rlieving|2020-03-29T00:00:00Z|
|2020-03-29| 817072420947247104|[#IMSS, #SanaDist...|           TweetDeck|   Tu_IMSS_Coah|2020-03-29T00:00:00Z|
|2020-03-29| 788863557349670913|[#Coronavirus, #C...|           TweetDeck|   Tabasco_IMSS|2020-03-29T00:00:00Z|
|2020-03-29|          132225222|[#Inf, #Conferenc...|           TweetDeck|      SSalud_mx|2020-03-29T00:

In [121]:

derived_df1 = derived_df1.groupBy(
   "tweet_date",
   "user_id",
).agg(
   count("*").alias("num_tweets"),
   F.max("screen_name").alias("screen_name"),
   collect_list("source").alias("source_list"),
   collect_list("hashtags").alias("hashtags"),
).withColumn(
   "source_count", 
   F.size("source_list")
).withColumn(
   "tweet_sources", 
   map_from_entries(
      expr("transform(array_distinct(source_list), x -> struct(x, size(filter(source_list, y -> y = x))))")
   )
).withColumn(
   "hashtags",
   explode("hashtags")
)

derived_df1 = derived_df1.select(
   "tweet_date",
   "user_id",
   "num_tweets",
   "hashtags",
   "tweet_sources",
   "screen_name",
)

derived_df1.show()

[Stage 133:>                                                        (0 + 1) / 1]

+----------+-------------------+----------+--------------------+--------------------+---------------+
|tweet_date|            user_id|num_tweets|            hashtags|       tweet_sources|    screen_name|
+----------+-------------------+----------+--------------------+--------------------+---------------+
|2020-03-29|         1000002758|         1|          [#COVID19]|{Twitter for Andr...| dreams_tiffany|
|2020-03-29|1000044336103469057|         1|[#Cristo, #cristo...|{Twitter for iPho...|       ByteDeFe|
|2020-03-29|1000051189986177025|         1|[#Covid_19, #Supp...|{Twitter for Andr...|     castjunkie|
|2020-03-29|1000051619009003522|         1|[#lockdown, #COVI...|{Twitter for Andr...|affordablelawng|
|2020-03-29|1000128817690705921|         2|         [#Covid_19]|{Twitter for Andr...|        AjammaS|
|2020-03-29|1000128817690705921|         2|      [#coronavirus]|{Twitter for Andr...|        AjammaS|
|2020-03-29|1000138722795491328|         1|          [#COVID19]|{Twitter for Andr.

                                                                                

In [21]:
derived_df2_quote = src_df\
.filter(col("is_quote") == "TRUE")\
.select(
    "tweet_date",
    "user_id",
    "reply_to_user_id",
    "reply_to_status_id",
    "created_at",
)\
.withColumnRenamed("user_id", "reply_user_id")\
.withColumnRenamed("tweet_date", "reply_date")\
.withColumnRenamed("reply_to_user_id", "original_user_id")\
.withColumnRenamed("created_at", "reply_at")


derived_df2_quote.show()

+----------+-------------------+-------------------+-------------------+--------------------+
|reply_date|      reply_user_id|   original_user_id| reply_to_status_id|            reply_at|
+----------+-------------------+-------------------+-------------------+--------------------+
|2020-03-29|           26663983|           67038807|1243928169243123712|2020-03-29T00:00:01Z|
|2020-03-29| 705880882955968512|               NULL|               NULL|2020-03-29T00:00:01Z|
|2020-03-29|1015324973051899904|               NULL|               NULL|2020-03-29T00:00:03Z|
|2020-03-29|          322312808|               NULL|               NULL|2020-03-29T00:00:03Z|
|2020-03-29|1208463505654812672|               NULL|               NULL|2020-03-29T00:00:05Z|
|2020-03-29| 880962856467877888|               NULL|               NULL|2020-03-29T00:00:05Z|
|2020-03-29|          139283160|               NULL|               NULL|2020-03-29T00:00:06Z|
|2020-03-29|          205649776|               NULL|        

In [29]:
derived_df2 = derived_df2_quote.alias("df1").join(
   src_df.select(
      "user_id",
      "created_at",
      "status_id",
   ).alias("df2"),
   col("df1.reply_to_status_id") == col("df2.status_id"),
   "left"
).withColumn(
   'reply_delay',
   F.when(
      col("status_id").isNotNull(),
      col("reply_at").cast("timestamp") - col("created_at").cast("timestamp"),
   )
).withColumn(
   "tweet_number", 
   F.when(
      col("status_id").isNotNull(), 
      expr("row_number() OVER (PARTITION BY reply_to_status_id ORDER BY reply_at)")
   )
)

derived_df2 = derived_df2\
.filter(col('status_id').isNotNull())\
.select(
   "reply_date",
   "reply_user_id",
   "original_user_id",
   "reply_delay",
   "tweet_number"
)

derived_df2.show()

[Stage 90:>                                                         (0 + 1) / 1]

+----------+-------------------+-------------------+--------------------+------------+
|reply_date|      reply_user_id|   original_user_id|         reply_delay|tweet_number|
+----------+-------------------+-------------------+--------------------+------------+
|2020-03-29|          528479211|          528479211|INTERVAL '0 00:23...|           1|
|2020-03-29|1089269762088157184|           68297567|INTERVAL '0 00:02...|           1|
|2020-03-29|           57049566|           52544275|INTERVAL '0 06:06...|           1|
|2020-03-29|1183474645414154240|1183474645414154240|INTERVAL '0 00:07...|           1|
|2020-03-29|          493434642|          493434642|INTERVAL '0 00:07...|           1|
|2020-03-29|           23023227|           23023227|INTERVAL '0 00:05...|           1|
|2020-03-29|         1067293117|         1067293117|INTERVAL '0 00:01...|           1|
|2020-03-29|1195054295999074304|         2854166772|INTERVAL '0 03:29...|           1|
|2020-03-29|          888027091|          8

                                                                                

In [None]:
derived_df2.filter(col('user_id').isNotNull()).show()

In [30]:
# Q3

In [38]:
derived_df3 = src_df.select(
   "user_id", 
   "screen_name", 
   "created_at"
).withColumn(
   "old_screen_name",
   F.lag("screen_name", 1).over(
      Window.partitionBy("user_id").orderBy(col("created_at").asc())
   )
).filter(
   col("old_screen_name") != col("screen_name")
).withColumn(
   "change_date",
   F.substring("created_at", 1, 10)
).drop("created_at")

derived_df3.show()

[Stage 112:>                                                        (0 + 1) / 1]

+-------------------+---------------+---------------+-----------+
|            user_id|    screen_name|old_screen_name|change_date|
+-------------------+---------------+---------------+-----------+
|1189542607245852673|      mskltccrc|HI66iiMDHyzU8oM| 2020-03-29|
|1228380016519647235|IprQa_AcPqIlIzx|ipr_ioZvQaPlIIb| 2020-03-29|
|1238870792500391936|     StoicUnity|   Tu_mamA_mama| 2020-03-29|
|1243516299067772928|  Infosglobales| InfosDigitales| 2020-03-29|
|1244367538529669120| BlackmanAndR88|      KanyeWI89| 2020-03-29|
| 907725168742735872|     gift_adene|    Real_Moses1| 2020-03-29|
| 907725168742735872|    Iam_Joshua5|     gift_adene| 2020-03-29|
|1092355973975552000|Engr_Azhar_Shah|        EazharS| 2020-03-29|
|1110883404491444225|PurpleIsPowrful|ruthforcongress| 2020-03-29|
|1192900241613377539| ImForAllHumans|   Michael4Yang| 2020-03-29|
|1224792058210267136|  AmiRFaRooq787|  AmiRFaRooq789| 2020-03-29|
|1229771933518045184|AlbaBarrenetxea|   Alba02208797| 2020-03-29|
|         

                                                                                