##### Challenge Técnico para la posición de Data Engineer LATAM.

<p> Creacion de notebook que permita realizar el Challenge tecnico para la posición Data Engineer.
 Este notebook se puede ejecutar las veces que sea necesario y estaria listo para pasar a producccion sin errores </p>
 
 ##### Resumen:

 <p> Se realizara la implementación de los siguientes puntos:
 
 * Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días. 
 * Los top 10 emojis más usados con su respectivo conteo.
 * El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos.
  </p>

##### Objetivo:

<p> Creacion de notbook que me permita realizar la prueba de Challenge tecnico para la posición Data Engineer - LATAM. </p>

##### Resultados:

<p> Resolucion de Challenge tecnico para la posición Data Engineer </p>

##### Autor:

<p> Teddy Arteaga </p>





Variables globales: se crea variables globales con el fin de que puedar se reutilizadas: 

In [1]:
file_path = "../data/farmers-protest-tweets-2021-2-4.json"

#### Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días. 

Por temas de que se note las librerias que se utilizan en cada función, se van a ir colocando.
Para la primera función *q1_time* se esta priorizando el tiempo de ejecución a continuación detallo lo que se esta realizando:

* Para procesar la información utilizaremos pandas que es una muy buena opción debido a su versatilidad.
* Como buena practica utilizamos el uso de exceptiones con el fin de poder manejar FileNotFoundError cuando el archivo no se encuentre.
* En lugar de leer el archivo JSON y luego convertirlo a Parquet, la función intenta leer directamente el archivo Parquet si este ya existe
* No realizamos ninguna operación que afecte al tiempo de ejecución(Eliminación de columnas, conversión en todos los datos, etc) hacemos el conteno directamente en los datos que son relevantes
* Para cada fecha con más tweets, se utiliza nlargest(1, 'count') en el DataFrame date_user_counts para obtener directamente el usuario con más tweets para esa fecha, en lugar de filtrar el DataFrame repetidamente.

In [2]:
import pandas as pd
from typing import List, Tuple
from datetime import date

def q1_time(file_path: str) -> List[Tuple[date, str]]:
    # Leer el archivo JSON directamente si el archivo Parquet no existe
    try:
        df = pd.read_parquet(file_path.replace('.json', '.parquet'))
    except FileNotFoundError:
        df = pd.read_json(file_path, lines=True)
        df['date'] = pd.to_datetime(df['date']).dt.date
        df['username'] = df['user'].apply(lambda x: x.get('username'))
        df.drop(columns=['user'], inplace=True)
        df.to_parquet(file_path.replace('.json', '.parquet'), index=False)

    # Contar el número de tweets por fecha y usuario
    date_user_counts = df.groupby(['date', 'username']).size().reset_index(name='count')

    # Encontrar las 10 fechas con más tweets
    top_dates = df['date'].value_counts().nlargest(10).index

    # Obtener el usuario con más tweets para cada una de las 10 fechas
    result = []
    for date_val in top_dates:
        top_user_df = date_user_counts[date_user_counts['date'] == date_val].nlargest(1, 'count')
        top_user = top_user_df['username'].iloc[0]
        result.append((date_val, top_user))

    return result



Optimización en el uso de la memoria:
Para este caso vamos a leer directamente el archivo json.
* Con top_users_by_date mantenemos un diccionario con el fin de mantener los usuarios con mas tweets
* Eliminamos date_counts ya que no se utiliza para los datos finales
* Se optimiza la actualización del diccionario top_users_by_date para mantener solo la información necesaria para obtener el usuario con más tweets por fecha.

In [4]:
from datetime import datetime
from typing import List, Tuple
from collections import defaultdict
import ujson as json

def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    # Diccionario para mantener el conteo de tweets por fecha
    date_counts = defaultdict(int)
    # Diccionario para mantener el usuario con más tweets por fecha
    top_users_by_date = {}
    
    with open(file_path, "r") as file:
        for line in file:
            tweet = json.loads(line)
            # Se extrae la fecha del tweet y se convierte a datetime.date
            tweet_date = datetime.strptime(tweet['date'].split("T")[0], '%Y-%m-%d').date()
            
            # Se actualiza el conteo de tweets para esa fecha
            date_counts[tweet_date] += 1
            
            # Se actualiza el usuario con más tweets para esa fecha
            if tweet_date not in top_users_by_date:
                top_users_by_date[tweet_date] = defaultdict(int)
            top_users_by_date[tweet_date][tweet['user']['username']] += 1
    
    # Se obtienen las top 10 fechas con más tweets
    top_dates = sorted(date_counts, key=date_counts.get, reverse=True)[:10]
    
    # Se obtiene el usuario con más tweets para cada fecha
    result = [(date, max(top_users_by_date[date], key=top_users_by_date[date].get)) for date in top_dates]
    
    return result



#### Los top 10 emojis más usados con su respectivo conteo. 

Enfoque de tiempo de ejecución

In [5]:
import os
import pandas as pd
import emoji
from collections import Counter
from typing import List, Tuple

def q2_time(file_path: str) -> List[Tuple[str, int]]:
    # Se ajusta la ruta de lectura de archivo json a .parquet para futuras lecturas
    parquet_file = file_path.replace('.json', '.parquet')
    
    try:
        # Se intenta leer el archivo parquet
        df = pd.read_parquet(parquet_file)
    except FileNotFoundError:
        # Si el archivo parquet no existe, se lee el archivo json y se convierte a parquet
        df = pd.read_json(file_path, lines=True)
        df.to_parquet(parquet_file, index=False)
    
    # Se extraen todos los emojis de la columna 'content' y se cuenta su frecuencia
    all_emojis = []
    for content in df['content']:
        emojis_in_content = [entry['emoji'] for entry in emoji.emoji_list(content)]
        all_emojis.extend(emojis_in_content)
    # Se cuenta la frecuencia de cada emoji
    emoji_counts = Counter(all_emojis)

    # Se obtienen los top 10 emojis más utilizados
    top_emojis = emoji_counts.most_common(10)
    
    return top_emojis


Enfoque de memoria optimatizada

In [6]:
import ujson as json
from collections import Counter
from typing import List, Tuple
import emoji

def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    # Se crea un contador para contar la frecuencia de cada emoji
    emoji_counts = Counter()
    
    # Se abre el archivo JSON y se itera sobre cada línea
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            # Se convierte la línea en un diccionario
            tweet = json.loads(line)
            # Se extrae el contenido del tweet y se obtienen los emojis
            content = tweet.get('content', '')
            emojis_in_content = [entry['emoji'] for entry in emoji.emoji_list(content)]
            
            # Se actualiza el contador con los emojis encontrados en este tweet
            emoji_counts.update(emojis_in_content)

    # Se obtienen los top 10 emojis más utilizados
    top_emojis = emoji_counts.most_common(10)
    
    return top_emojis



##### El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos.

Enfoque tiempo de ejecución.

In [7]:
import os
import pandas as pd
from collections import Counter
from typing import List, Tuple

def q3_time(file_path: str) -> List[Tuple[str, int]]:
    # Se ajusta la ruta de lectura del archivo JSON a .parquet para futuras lecturas
    parquet_file = file_path.replace('.json', '.parquet')

    try:
        # Intentar leer el archivo Parquet si existe
        df = pd.read_parquet(parquet_file)
    except FileNotFoundError:
        try:
            # Intentar leer el archivo JSON y convertirlo a DataFrame
            df = pd.read_json(file_path, lines=True)
            # Procesar los datos y guardarlos como archivo Parquet
            df.to_parquet(parquet_file, index=False)
        except Exception as e:
            print(f"Error al procesar el archivo: {e}")
            return []

    # Se extraen todas las menciones por cada tweet
    mentions_flat = []
    for mentions_list in df['content'].str.findall(r'@(\w+)').dropna():
        mentions_flat.extend(mentions_list)

    # Se cuenta la frecuencia de cada mención en la lista aplanada
    mention_counts = Counter(mentions_flat)

    # Obtener los top 10 usuarios más mencionados
    top_mentions = mention_counts.most_common(10)

    return top_mentions


Enfoque de memoria optomatizada

In [2]:
import ujson as json
from collections import Counter
from typing import List, Tuple
import re

def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    # Se crea un Counter para contar la frecuencia de cada mención
    mention_counts = Counter()
    
    # Se abre el archivo json y se itera sobre cada tweet
    with open(file_path, 'r') as file:
        # Iterar línea por línea en el archivo
        for line in file:
            tweet = json.loads(line)
            content = tweet.get('content', '')
            # Se busca todas las menciones en el contenido y se actualiza el Counter
            mention_counts.update(re.findall(r'@(\w+)', content))
            
    # Obtener los top 10 usuarios más mencionados
    top_mentions = mention_counts.most_common(10)
    
    return top_mentions


#### Resultados



In [62]:
q1_time(file_path)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

In [63]:
q1_memory(file_path)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

In [64]:
q2_time(file_path)

[('🙏', 5049),
 ('😂', 3072),
 ('🚜', 2972),
 ('🌾', 2182),
 ('🇮🇳', 2086),
 ('🤣', 1668),
 ('✊', 1651),
 ('❤️', 1382),
 ('🙏🏻', 1317),
 ('💚', 1040)]

In [65]:
q2_memory(file_path)

[('🙏', 5049),
 ('😂', 3072),
 ('🚜', 2972),
 ('🌾', 2182),
 ('🇮🇳', 2086),
 ('🤣', 1668),
 ('✊', 1651),
 ('❤️', 1382),
 ('🙏🏻', 1317),
 ('💚', 1040)]

In [66]:
q3_time(file_path)

[('narendramodi', 2261),
 ('Kisanektamorcha', 1836),
 ('RakeshTikaitBKU', 1639),
 ('PMOIndia', 1422),
 ('RahulGandhi', 1125),
 ('GretaThunberg', 1046),
 ('RaviSinghKA', 1015),
 ('rihanna', 972),
 ('UNHumanRights', 962),
 ('meenaharris', 925)]

In [67]:
q3_memory(  )

[('narendramodi', 2261),
 ('Kisanektamorcha', 1836),
 ('RakeshTikaitBKU', 1639),
 ('PMOIndia', 1422),
 ('RahulGandhi', 1125),
 ('GretaThunberg', 1046),
 ('RaviSinghKA', 1015),
 ('rihanna', 972),
 ('UNHumanRights', 962),
 ('meenaharris', 925)]

##### Mejoras referente a la carga del archivo

* Si el archivo sigue creciendo sugiero dividir el archivo en lotes con el fin que pueda caber en la memoria RAM, leer y procesar un número limitado de registros a la vez en lugar de cargar todo el archivo en la memoria de una vez.
* Tener una estructura de datos eficionte lo que nos ayude automatizar el tamaño del archivo. Por ejemplo que el archivo venga directamente en parquet debido su alamacenamiento columnar lo que reduciria considerablemente el archivo.

Pruebas de las funciones

In [9]:
from memory_profiler import profile
import cProfile
import pstats
from datetime import datetime

# Define tus funciones q1_memory y q1_time aquí

@profile
def test_memory_usage(file_path):
    # Llama a tus funciones aquí
    q1_memory(file_path)
    q1_time(file_path)

if __name__ == "__main__":
   
    # Prueba de q1_memory
    print("Evaluacion de memoria q1_memory")
    test_memory_usage(file_path)
    
    # Prueba de q1_time
    profiler = cProfile.Profile()
    profiler.enable()
    # Codigo a evaluar
    q1_time(file_path)
    profiler.disable()
    profiler.dump_stats("q1_time_stats.pstats")
    stats = pstats.Stats("q1_time_stats.pstats")
    stats.sort_stats("cumulative")
    # Se muestran resultados ordenados por tiempo de ejecucion, del mas lento al mas rapido
    stats.print_stats(10)


Evaluacion de memoria q1_memory
ERROR: Could not find file C:\Users\teddy\AppData\Local\Temp\ipykernel_90700\2435509172.py
Sun Mar 17 20:51:21 2024    q1_time_stats.pstats

         25675 function calls (25107 primitive calls) in 1.083 seconds

   Ordered by: cumulative time
   List reduced from 684 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    1.083    1.083 C:\Users\teddy\AppData\Local\Temp\ipykernel_90700\1997928156.py:5(q1_time)
        1    0.001    0.001    1.022    1.022 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\io\parquet.py:501(read_parquet)
        1    0.000    0.000    1.020    1.020 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\io\parquet.py:237(read)
        1    0.000    0.000    0.746    0.746 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\site-packages\pyarrow\pandas_compat.py:752(table_to_dataframe)

In [10]:
# Define tus funciones q1_memory y q1_time aquí

@profile
def test_memory_usage(file_path):
    # Llama a tus funciones aquí
    q2_memory(file_path)
    q2_time(file_path)

if __name__ == "__main__":
   
    # Prueba de q1_memory
    print("Evaluacion de memoria q1_memory")
    test_memory_usage(file_path)
    
    # Prueba de q1_time
    profiler = cProfile.Profile()
    profiler.enable()
    # Codigo a evaluar
    q2_time(file_path)
    q2_memory(file_path)
    profiler.disable()
    profiler.dump_stats("q2_time_stats.pstats")
    stats = pstats.Stats("q2_time_stats.pstats")
    stats.sort_stats("cumulative")
    # Se muestran resultados ordenados por tiempo de ejecucion, del mas lento al mas rapido
    stats.print_stats(10)

Evaluacion de memoria q1_memory
ERROR: Could not find file C:\Users\teddy\AppData\Local\Temp\ipykernel_90700\1703600861.py
Sun Mar 17 21:09:49 2024    q2_time_stats.pstats

         172298652 function calls (172298634 primitive calls) in 37.359 seconds

   Ordered by: cumulative time
   List reduced from 379 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   234814    6.043    0.000   34.362    0.000 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\site-packages\emoji\core.py:283(emoji_list)
      7/3    0.046    0.007   26.366    8.789 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py:1908(_run_once)
 34281412   16.141    0.000   26.237    0.000 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\site-packages\emoji\tokenizer.py:158(tokenize)
        8    0.063    0.008   21.014    2.627 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\selectors.py:313(_select)
        7  

In [11]:
# Define tus funciones q1_memory y q1_time aquí

@profile
def test_memory_usage(file_path):
    # Llama a tus funciones aquí
    q3_memory(file_path)
    q3_time(file_path)

if __name__ == "__main__":
   
    # Prueba de q1_memory
    print("Evaluacion de memoria q1_memory")
    test_memory_usage(file_path)
    
    # Prueba de q1_time
    profiler = cProfile.Profile()
    profiler.enable()
    # Codigo a evaluar
    q3_time(file_path)
    q3_memory(file_path)
    profiler.disable()
    profiler.dump_stats("q3_time_stats.pstats")
    stats = pstats.Stats("q3_time_stats.pstats")
    stats.sort_stats("cumulative")
    # Se muestran resultados ordenados por tiempo de ejecucion, del mas lento al mas rapido
    stats.print_stats(10)

Evaluacion de memoria q1_memory
ERROR: Could not find file C:\Users\teddy\AppData\Local\Temp\ipykernel_90700\2046086110.py
Sun Mar 17 21:10:28 2024    q3_time_stats.pstats

         1510390 function calls (1510372 primitive calls) in 3.683 seconds

   Ordered by: cumulative time
   List reduced from 443 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.012    0.012    2.059    2.059 C:\Users\teddy\AppData\Local\Temp\ipykernel_90700\1467250520.py:6(q3_time)
        1    0.001    0.001    1.992    1.992 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\io\parquet.py:501(read_parquet)
        1    0.000    0.000    1.991    1.991 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\site-packages\pandas\io\parquet.py:237(read)
        1    0.000    0.000    1.587    1.587 c:\Users\teddy\AppData\Local\Programs\Python\Python312\Lib\site-packages\pyarrow\pandas_compat.py:752(table_to_datafr