# Challenge Resolution

## Installing dependencies

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=8e192e512d15af441f60380e6c761b864ff666b88618fa2c697be5dce86a6f10
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


# Solución problema 1:


1.   Top 10 fechas con más tweets
2.   Mostrar el nombre de los usuarios que más tweets realizaron en esas fechas



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Latamchallenge") \
    .getOrCreate()

In [3]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [4]:
rdata = spark.read.json('/content/drive/MyDrive/data/farmers-protest-tweets-2021-2-4.json')
print(rdata.printSchema())
rdata.show(5)
print((rdata.count(), len(rdata.columns)))

root
 |-- content: string (nullable = true)
 |-- conversationId: long (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- likeCount: long (nullable = true)
 |-- media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- duration: double (nullable = true)
 |    |    |-- fullUrl: string (nullable = true)
 |    |    |-- previewUrl: string (nullable = true)
 |    |    |-- thumbnailUrl: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- variants: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bitrate: long (nullable = true)
 |    |    |    |    |-- contentType: string (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |-- mentionedUsers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |   

In [6]:
# Filtro la raw data quedándome sólo con las columnas que necesito para este problema.
q1_data = rdata.select('date', 'user.username').withColumn("date", F.to_date('date'))
q1_data.show(5)

+----------+---------------+
|      date|       username|
+----------+---------------+
|2021-02-24|ArjunSinghPanam|
|2021-02-24|     PrdeepNain|
|2021-02-24| parmarmaninder|
|2021-02-24|  anmoldhaliwal|
|2021-02-24|     KotiaPreet|
+----------+---------------+
only showing top 5 rows



In [34]:
# Realizo un conteo por fecha y por usuario.
# Me servirá para luego sacar los top10 por fecha y por usuario
date_activity = q1_data.groupBy('date', 'username').count()
date_activity.show()

+----------+---------------+-----+
|      date|       username|count|
+----------+---------------+-----+
|2021-02-24| BumblebeeUmeed|    1|
|2021-02-24|       htTweets|    2|
|2021-02-24|       v_sanjai|    1|
|2021-02-24|     dr_sonia27|    1|
|2021-02-24|   RamneetMann4|    1|
|2021-02-24|   JPSinghRuhil|    2|
|2021-02-24|   BjpReporting|    1|
|2021-02-24| manesh67726670|    1|
|2021-02-24|        AKulvir|    1|
|2021-02-24|       HAchahal|    1|
|2021-02-24| AnuragVerma_SP|    1|
|2021-02-24|Gurwind33930102|    3|
|2021-02-23|   TechieKisaan|    2|
|2021-02-23| MindsetMatter1|    1|
|2021-02-23|  Nimratkhaira_|    1|
|2021-02-23|    Lats_tweets|    1|
|2021-02-23|Deshbha99450233|    1|
|2021-02-23|     SecondEye5|    1|
|2021-02-23|     alieshan09|    1|
|2021-02-23|      SikhWhite|    1|
+----------+---------------+-----+
only showing top 20 rows



In [14]:
# Realizo una sumarización del conteo por fecha
date_total_activity = date_activity.groupBy('date').agg(F.sum('count').alias('total_activity'))
date_total_activity.show()

+----------+--------------+
|      date|total_activity|
+----------+--------------+
|2021-02-15|          9197|
|2021-02-21|          7532|
|2021-02-19|          8204|
|2021-02-20|          8502|
|2021-02-17|         11087|
|2021-02-24|          3437|
|2021-02-18|          9625|
|2021-02-14|         10249|
|2021-02-12|         12347|
|2021-02-22|          7071|
|2021-02-13|         11296|
|2021-02-23|          8417|
|2021-02-16|         10443|
+----------+--------------+



In [15]:
# Mediante la función analítica genero un ranking de la actividad por día
window_spec = Window.orderBy(F.desc("total_activity"))
date_total_activity = date_total_activity.withColumn("rank", F.row_number().over(window_spec))

date_total_activity.show()

+----------+--------------+----+
|      date|total_activity|rank|
+----------+--------------+----+
|2021-02-12|         12347|   1|
|2021-02-13|         11296|   2|
|2021-02-17|         11087|   3|
|2021-02-16|         10443|   4|
|2021-02-14|         10249|   5|
|2021-02-18|          9625|   6|
|2021-02-15|          9197|   7|
|2021-02-20|          8502|   8|
|2021-02-23|          8417|   9|
|2021-02-19|          8204|  10|
|2021-02-21|          7532|  11|
|2021-02-22|          7071|  12|
|2021-02-24|          3437|  13|
+----------+--------------+----+



In [18]:
# Realizo un filtro por la columna ranking quedándome sólo con las menores o iguales a 10
top_10_dates = date_total_activity.filter("rank <= 10")
top_10_dates.show()

+----------+--------------+----+
|      date|total_activity|rank|
+----------+--------------+----+
|2021-02-12|         12347|   1|
|2021-02-13|         11296|   2|
|2021-02-17|         11087|   3|
|2021-02-16|         10443|   4|
|2021-02-14|         10249|   5|
|2021-02-18|          9625|   6|
|2021-02-15|          9197|   7|
|2021-02-20|          8502|   8|
|2021-02-23|          8417|   9|
|2021-02-19|          8204|  10|
+----------+--------------+----+



In [37]:
# Utilizo la misma función analítica row_number, en esta ocasión para filtrar
# por el usuario que más tweets realizó por cada fecha
most_active_users_per_date = date_activity \
    .groupBy("date", "username") \
    .agg(F.sum("count").alias("user_activity")) \
    .withColumn("rank", F.row_number().over(Window.partitionBy("date").orderBy(F.desc("user_activity")))) \
    .filter("rank = 1")

# Realizo un join, respetando el orden de mi top10 de fechas con más actividad
# mencionando al usuario que más tweets realizó en cada fecha
top_10_dates.join(most_active_users_per_date, on=['date'], how='left').select('date', 'username').collect()

[Row(date=datetime.date(2021, 2, 12), username='RanbirS00614606'),
 Row(date=datetime.date(2021, 2, 13), username='MaanDee08215437'),
 Row(date=datetime.date(2021, 2, 17), username='RaaJVinderkaur'),
 Row(date=datetime.date(2021, 2, 16), username='jot__b'),
 Row(date=datetime.date(2021, 2, 14), username='rebelpacifist'),
 Row(date=datetime.date(2021, 2, 18), username='neetuanjle_nitu'),
 Row(date=datetime.date(2021, 2, 15), username='jot__b'),
 Row(date=datetime.date(2021, 2, 20), username='MangalJ23056160'),
 Row(date=datetime.date(2021, 2, 23), username='Surrypuria'),
 Row(date=datetime.date(2021, 2, 19), username='Preetm91')]

# Solución problema 2:

# Solución problema 3: