# Challenge Data Engineer

Este Notebook contiene las soluciones para resolver tres problemas usando la libreria de pyspark, consta de 4 secciones:
- Inicialización, en donde se importa las librerias, constantes y funciones que se usan en el resto del código
- Una sección por cada uno de los tres problemas del reto. Cada solución contiene la descripción, las soluciones con dos versiones, una para optimizar tiempo y otra para memoria y los resultados

## Inicialización

Ejecutar esta linea de codigo sin comentar solamente una vez al inicio de la sesión para instalar el package de emoji

In [1]:
!pip install emoji  #API reference: https://carpedm20.github.io/emoji/docs/api.html

Collecting emoji
  Obtaining dependency information for emoji from https://files.pythonhosted.org/packages/96/c6/0114b2040a96561fd1b44c75df749bbd3c898bf8047fb5ce8d7590d2dee6/emoji-2.8.0-py2.py3-none-any.whl.metadata
  Downloading emoji-2.8.0-py2.py3-none-any.whl.metadata (5.3 kB)
Downloading emoji-2.8.0-py2.py3-none-any.whl (358 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m358.9/358.9 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: emoji
Successfully installed emoji-2.8.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, to_date, col, desc, row_number, udf, explode
from pyspark.sql.types import ArrayType, StringType
import pyspark.sql.functions as F

from pyspark.sql.window import Window

from typing import List, Tuple
import datetime
import emoji
import re

### Definición de constantes

In [3]:
file_path = "farmers-protest-tweets-2021-2-4.json"

Definir una función que genera la session de Spark u obtiene una versión ya existente

In [4]:
def get_spark_session() -> SparkSession:
    return (SparkSession.builder
            .appName("TwitterAnalysis")
            .getOrCreate())

## Problema 1
Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene
por cada uno de esos días. Debe incluir las siguientes funciones:
- def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
- def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:

### Solución q1_time

In [75]:
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    """
        Funcion para la optimizacion basada en tiempo, aqui se hace uso de cache para cada uno de los DataFrames
        intermedios aunque el uso de memoria es mas extensivo, ademas se utiliza broadcast join para mejorar la
        eficiencia en red y la paralelizacion en los nodos
    """
    
    spark = get_spark_session()

    date_format = "yyyy-MM-dd'T'HH:mm:ssXXX"
    
    tweetsDF = (spark.read.option("inferSchema","true")
                       .option("header","true")
                       .json(file_path)
                       .select("date", "user.username")
                       .withColumn("date", to_date(col("date"), date_format))
               ).cache()  # Cache tweetsDF debido a que es usado multiples veces

    top_dates_df = (tweetsDF.groupBy("date")
                           .agg(count("*").alias("count"))
                           .orderBy(desc("count"))
                           .limit(10)
                   ).cache()  # Cache top_dates_df debido a que es usado multiples veces

    # Se usa broadcast join ya que es un conjunto pequeño de fechas para acelerar la operación de join, 
    filtered_df = tweetsDF.join(F.broadcast(top_dates_df), "date")

    grouped_df = (filtered_df.groupBy("date", "username")
                             .agg(count("*").alias("count"))
                  ).cache()  # Cache grouped_df debido a que es usado multiples veces

    windowSpec = Window.partitionBy("date").orderBy(desc("count"))
    
    top_user_df = (grouped_df.withColumn("row_number", row_number().over(windowSpec))
                          .filter(col("row_number") == 1)
                          .drop("row_number")
                  ).withColumnRenamed("count", "tweets_by_user")
    
    top_dates_with_users = (top_user_df.join(top_dates_df, "date")
                                   .orderBy(desc("count"), "date")
                            .collect()
                           )

    return [(row['date'], row['username']) for row in top_dates_with_users]

In [76]:
print("----- time -------")
q1_time_result = q1_time(file_path)
for e in q1_time_result:
    print(e)

----- time -------
(datetime.date(2021, 2, 12), 'RanbirS00614606')
(datetime.date(2021, 2, 13), 'MaanDee08215437')
(datetime.date(2021, 2, 17), 'RaaJVinderkaur')
(datetime.date(2021, 2, 16), 'jot__b')
(datetime.date(2021, 2, 14), 'rebelpacifist')
(datetime.date(2021, 2, 18), 'neetuanjle_nitu')
(datetime.date(2021, 2, 15), 'jot__b')
(datetime.date(2021, 2, 20), 'MangalJ23056160')
(datetime.date(2021, 2, 23), 'Surrypuria')
(datetime.date(2021, 2, 19), 'Preetm91')


### Solución q1_memory

In [77]:
def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    """
        Funcion para la optimizacion basada en memoria, se utilizan utiliza la menor cantidad
        de transformaciones necesarias, para evitar el uso excesivo de memoria
    """
    spark = get_spark_session()

    date_format = "yyyy-MM-dd'T'HH:mm:ssXXX"
    
    tweetsDF = (spark.read.option("inferSchema", "true")
                       .option("header", "true")
                       .json(file_path)
                       .select("date", "user.username")
                       .withColumn("date", to_date(col("date"), date_format))
               )

    # Calcular el numero de tweets por usuario
    grouped_df = (tweetsDF.groupBy("date", "username")
                          .agg(count("*").alias("count"))
                 )

    # se definen funciones de ventana para obtener el top de fechas y el usuario con mas tweets
    windowSpecDates = Window.orderBy(desc("count_dates"))
    windowSpecUsers = Window.partitionBy("date").orderBy(desc("count"))

    # Calcular el top 10 de fechas y el top 1 de usuarios usando las funciones de ventana en una sola operacion
    result = (grouped_df.groupBy("date")
                      .agg(F.sum("count").alias("count_dates"))
                      .withColumn("rank_dates", row_number().over(windowSpecDates))
                      .filter(col("rank_dates") <= 10)
                      .join(grouped_df, "date")
                      .withColumn("rank_users", row_number().over(windowSpecUsers))
                      .filter(col("rank_users") == 1)
                      .orderBy(desc("count_dates"), "date")
                      .select("date", "username")
                      .collect()
              )

    return [(row['date'], row['username']) for row in result]


Presentación de los resultados

In [78]:
print("----- memory -------")
q1_memory_result = q1_memory(file_path)
for e in q1_memory_result:
    print(e)

----- memory -------
(datetime.date(2021, 2, 12), 'RanbirS00614606')
(datetime.date(2021, 2, 13), 'MaanDee08215437')
(datetime.date(2021, 2, 17), 'RaaJVinderkaur')
(datetime.date(2021, 2, 16), 'jot__b')
(datetime.date(2021, 2, 14), 'rebelpacifist')
(datetime.date(2021, 2, 18), 'neetuanjle_nitu')
(datetime.date(2021, 2, 15), 'jot__b')
(datetime.date(2021, 2, 20), 'MangalJ23056160')
(datetime.date(2021, 2, 23), 'Surrypuria')
(datetime.date(2021, 2, 19), 'Preetm91')


## Problema 2
Los top 10 emojis más usados con su respectivo conteo. Debe incluir las siguientes funciones:
- def q2_time(file_path: str) -> List[Tuple[str, int]]:
- def q2_memory(file_path: str) -> List[Tuple[str, int]]:

### Solución q2_time

Definir una función que extrae todos los emojis usando el paquete de emoji (instalado al incio del notebook) emoji.emoji_list

In [79]:
def extract_emojis(text):
    return [char['emoji'] for char in emoji.emoji_list(text)]

# Registrar la función como una UDF
extract_emojis_udf = udf(extract_emojis, ArrayType(StringType()))

In [80]:
def q2_time(file_path: str) -> List[Tuple[str, int]]:
    """
        Funcion para la optimizacion basada en tiempo, Para optimizar en velocidad se hace uso del cache
    """
    spark = get_spark_session()
    
    tweetsDF = (spark.read.option("inferSchema", "true")
                       .option("header", "true")
                       .json(file_path)
                       .select("content")
               ).cache() #Se utiliza cache para almacenar en memoria
    
    tweets_with_emojis_df = tweetsDF.withColumn("emojis", extract_emojis_udf(tweetsDF["content"])).cache()
   
    df_only_emojis = tweets_with_emojis_df.filter(F.size("emojis") > 0)

    top_emojis = (df_only_emojis.select(F.explode("emojis").alias("emoji"))
                                  .groupBy("emoji")
                                  .agg(F.count("*").alias("count"))
                                  .orderBy(F.desc("count"))
                                  .limit(10))

    return [(row['emoji'], row['count']) for row in top_emojis.collect()]


Presentación de los resultados

In [81]:
q2_time_result = q2_time(file_path)
for e in q2_time_result:
    print(e)

('🙏', 5049)
('😂', 3072)
('🚜', 2972)
('🌾', 2182)
('🇮🇳', 2086)
('🤣', 1668)
('✊', 1651)
('❤️', 1382)
('🙏🏻', 1317)
('💚', 1040)


### Solución q2_memory

Nota: Tambien se debe implementar la función de extract_emojis descrita anteriormente

In [82]:
def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    """
        Funcion para la optimizacion basada en memoria. Se intenta hacer el menor numero de transformaciones posibles
    """
    spark = get_spark_session()
    
    tweetsDF = (spark.read.option("inferSchema", "true")
                       .option("header", "true")
                       .json(file_path)
                       .select("content"))
   
    tweets_with_emojis_df = tweetsDF.withColumn("emojis", extract_emojis_udf(tweetsDF["content"]))

    # Directamente realizar operaciones en los datos sin almacenar en caché
    top_emojis = (tweets_with_emojis_df.filter(F.size("emojis") > 0)
                                         .select(F.explode("emojis").alias("emoji"))
                                         .groupBy("emoji")
                                         .agg(F.count("*").alias("count"))
                                         .orderBy(F.desc("count"))
                                         .limit(10))

    return [(row['emoji'], row['count']) for row in top_emojis.collect()]

Presentación de los resultados

In [83]:
q2_mem_result = q2_memory(file_path)
for e in q2_mem_result:
    print(e)

('🙏', 5049)
('😂', 3072)
('🚜', 2972)
('🌾', 2182)
('🇮🇳', 2086)
('🤣', 1668)
('✊', 1651)
('❤️', 1382)
('🙏🏻', 1317)
('💚', 1040)


## Problema 3
El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@)
que registra cada uno de ellos. Debe incluir las siguientes funciones:
  - def q3_time(file_path: str) -> List[Tuple[str, int]]:
  - def q3_memory(file_path: str) -> List[Tuple[str, int]]:

###  Solución q3_time

Definir una función que extrae menciones usando expresiones regulares

In [67]:
def extract_mentions(text):
    return re.findall(r"@\w+", text)

# Registrar la función como una UDF
extract_mentions_udf = udf(extract_mentions, ArrayType(StringType()))

In [68]:
def q3_time(file_path: str) -> List[Tuple[str, int]]:
    """
        Para optimizar en velocidad se hace uso del cache
    """
    spark = get_spark_session()
    
    tweetsDF = (spark.read.option("inferSchema", "true")
                       .option("header", "true")
                       .json(file_path)
                       .select("content")).cache()
    
    df_with_mentions = tweetsDF.withColumn("mentions", extract_mentions_udf(tweetsDF["content"])).cache()
   
    top_mentions = (df_with_mentions.select(F.explode("mentions").alias("mention"))
                                    .groupBy("mention")
                                    .agg(F.count("*").alias("count"))
                                    .orderBy(F.desc("count"))
                                    .limit(10))
    
    return [(row['mention'], row['count']) for row in top_mentions.collect()]


Presentación de los resultados

In [70]:
q3_time_result = q3_time(file_path)
for e in q3_time_result:
    print(e)

('@narendramodi', 2261)
('@Kisanektamorcha', 1836)
('@RakeshTikaitBKU', 1639)
('@PMOIndia', 1422)
('@RahulGandhi', 1125)
('@GretaThunberg', 1046)
('@RaviSinghKA', 1015)
('@rihanna', 972)
('@UNHumanRights', 962)
('@meenaharris', 925)


###  Solución q3_memory

Nota: Tambien se debe implementar la función de extrac_mentions descrita anteriormente

In [73]:
def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    """
        Para optimizar en memoria se evita el uso del cache
    """
    spark = get_spark_session()
    
    tweetsDF = (spark.read.option("inferSchema", "true")
                       .option("header", "true")
                       .json(file_path)
                       .select("content"))
    
    df_with_mentions = tweetsDF.withColumn("mentions", extract_mentions_udf(tweetsDF["content"]))

    # Directamente realizar operaciones en los datos sin almacenar en caché
    top_mentions = (df_with_mentions.select(F.explode("mentions").alias("mention"))
                                    .groupBy("mention")
                                    .agg(F.count("*").alias("count"))
                                    .orderBy(F.desc("count"))
                                    .limit(10))
    
    return [(row['mention'], row['count']) for row in top_mentions.collect()]


Presentación de los resultados

In [74]:
q3_memory_result = q3_memory(file_path)
for e in q3_memory_result:
    print(e)

('@narendramodi', 2261)
('@Kisanektamorcha', 1836)
('@RakeshTikaitBKU', 1639)
('@PMOIndia', 1422)
('@RahulGandhi', 1125)
('@GretaThunberg', 1046)
('@RaviSinghKA', 1015)
('@rihanna', 972)
('@UNHumanRights', 962)
('@meenaharris', 925)
