# Challenge DE
En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

Primero cargamos las extensiones para analizar el consumo de memoria y los tiempos de ejecucion de cada una de las funciones desarrolladas.

In [1]:
%load_ext memory_profiler
%load_ext line_profiler

Luego definimos el path donde tenemos almacenados los datos a procesar.

In [2]:
FILE_PATH = "../data/farmers-protest-tweets-2021-2-4.json"

## Análisis de memoria
Empecemos por importar las funciones que optimizan el bajo uso de memoria sobre el tiempo de ejecución.

In [3]:
from q1_memory import q1_memory
from q2_memory import q2_memory
from q3_memory import q3_memory

Analicemos línea por línea el uso de memoria para cada una de las funciones.

In [4]:
%mprun -f q1_memory q1_mem_result = q1_memory(FILE_PATH)




Filename: /home/juaquin/Descargas/latam-challenge/src/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    15     65.9 MiB     65.9 MiB           1   def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    16                                             """
    17                                             Gets the top 10 dates with the most tweets and the user with the most
    18                                             posts for each day while optimizing memory usage.
    19                                         
    20                                             Args:
    21                                                 file_path: The path to the input JSON file.
    22                                         
    23                                             Returns:
    24                                                 A list of tuples, where each tuple contains a date and its
    25                                                

In [5]:
%mprun -f q2_memory q2_mem_result = q2_memory(FILE_PATH)




Filename: /home/juaquin/Descargas/latam-challenge/src/q2_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    15     69.5 MiB     69.5 MiB           1   def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    16                                             """
    17                                             Gets the top 10 most used emojis and its counts while optimizing memory
    18                                             usage.
    19                                         
    20                                             Args:
    21                                                 file_path: The path to the input JSON file.
    22                                         
    23                                             Returns:
    24                                                 A list of tuples, where each tuple contains an emoji and its
    25                                                     corresponding count.
    26                 

In [6]:
%mprun -f q3_memory q3_mem_result = q3_memory(FILE_PATH)




Filename: /home/juaquin/Descargas/latam-challenge/src/q3_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    14     69.5 MiB     69.5 MiB           1   def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    15                                             """
    16                                             Gets the top 10 most mentioned users and its counts while optimizing
    17                                             memory usage.
    18                                         
    19                                             Args:
    20                                                 file_path: The path to the input JSON file.
    21                                         
    22                                             Returns:
    23                                                 A list of tuples, where each tuple contains a username and its
    24                                                     corresponding count.
    25           

Las tres funciones rondan los 70 MiB de uso de memoria. No se aprecia ningún incremento considerable línea por línea.

Ahora comparémoslas con sus funciones contraparte, es decir, las que optimizan el tiempo de ejecución sobre el uso de memoria. En estos casos lo único que nos interesa es el uso máximo, así que no analizaremos línea por línea.

In [7]:
from q1_time import q1_time
from q2_time import q2_time
from q3_time import q3_time


%memit q1_time_result = q1_time(FILE_PATH)
%memit q2_time_result = q2_time(FILE_PATH)
%memit q3_time_result = q3_time(FILE_PATH)

peak memory: 98.01 MiB, increment: 20.88 MiB
peak memory: 80.74 MiB, increment: 0.00 MiB
peak memory: 80.74 MiB, increment: 0.00 MiB


Hay un incremento en el uso de memoria de alrededor de 10 MiB para `q2` y `q3`, mientras que para `q1` es de más de 20 MiB. Esto seguramente se debe a que es la única función que utiliza `numpy`. Más adelante analizaremos la razón de esto.

## Validación de resultados
Antes de pasar al análisis de los tiempos de ejecución, primero validemos si, en efecto, sendas funciones coinciden en resultados.

In [8]:
print("q1", q1_mem_result == q1_time_result, q1_mem_result)
print("q2", q2_mem_result == q2_time_result, q2_mem_result)
print("q3", q3_mem_result == q3_time_result, q3_mem_result)

q1 True [(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]
q2 True [('🙏', 7286), ('😂', 3072), ('🚜', 2972), ('✊', 2411), ('🌾', 2363), ('🏻', 2080), ('❤', 1779), ('🤣', 1668), ('🏽', 1218), ('👇', 1108)]
q3 True [('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


### Consideraciones
Efectivamente, las funciones están obteniendo los mismos resultados que sus respectivas contrapartes.

Es importante mencionar algunas consideraciones que se tomaron en cuenta para llegar a estos resultados:

- Para `q1`, la contabilización de tweets **no** toma en cuenta los campos `retweetedTweet` y `quotedTweet`.
- Para `q2`, la contabilización de emojis **no** toma en cuenta el contenido de los campos `retweetedTweet` y `quotedTweet`.
- Para `q2`, la contabilización de emojis toma en cuenta únicamente los caracters incluidos en la siguiente expresión regular:
    ```python
    EMOJI_PATTERN = (
        r"[\U0001F300-\U0001F5FF" # Miscellaneous Symbols and Pictographs
        r"\U0001F600-\U0001F64F"  # Emoticons
        r"\U0001F680-\U0001F6FF"  # Transport and Map Symbols
        r"\U0001F700-\U0001F77F"  # Alchemical Symbols
        r"\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
        r"\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
        r"\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
        r"\U0001FA00-\U0001FA6F"  # Chess Symbols
        r"\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
        r"\U00002702-\U000027B0"  # Dingbats
        r"\U0001F004-\U0001F0CF]"  # Mahjong Tiles, Dominoes, Playing Cards
    )
    ```
- Para `q3`, la contabilización de los usuarios más influyentes está en función de los usuarios que aparecen en los arreglos de los campos `mentionedUsers`.

## Análisis de tiempo de ejecución
Avancemos ahora con el análisis de los tiempos de ejecución.

Comparemos el rendimiento de cada uno de los tres pares de funciones.

In [9]:
%timeit q1_memory(FILE_PATH)
%timeit q1_time(FILE_PATH)

2.8 s ± 24.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.15 s ± 9.92 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
%timeit q2_memory(FILE_PATH)
%timeit q2_time(FILE_PATH)

3.06 s ± 17.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.38 s ± 19.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
%timeit q2_memory(FILE_PATH)
%timeit q2_time(FILE_PATH)

3.01 s ± 37 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.38 s ± 18.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


En los tres casos el tiempo de ejecución se reduce a menos de la mitad. Veamos el análisis línea por línea.

In [12]:
%prun q1_time(FILE_PATH)

 

         452323 function calls in 1.291 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.615    0.615    1.290    1.290 q1_time.py:18(q1_time)
   117407    0.508    0.000    0.508    0.000 {built-in method msgspec._core.json_decode}
    49772    0.048    0.000    0.048    0.000 {built-in method _codecs.utf_8_decode}
        1    0.035    0.035    0.035    0.035 {built-in method _collections._count_elements}
        2    0.028    0.014    0.028    0.014 {built-in method numpy.array}
    49772    0.025    0.000    0.073    0.000 <frozen codecs>:319(decode)
   234834    0.015    0.000    0.015    0.000 {method 'append' of 'list' objects}
       10    0.011    0.001    0.011    0.001 {method 'sort' of 'numpy.ndarray' objects}
       10    0.002    0.000    0.014    0.001 _arraysetops_impl.py:339(_unique1d)
        1    0.001    0.001    1.290    1.290 <string>:1(<module>)
       10    0.000    0.000    0.000    0.

In [13]:
%prun q2_time(FILE_PATH)

 

         921421 function calls in 1.676 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.618    0.618    1.676    1.676 q2_time.py:18(q2_time)
   117407    0.515    0.000    0.515    0.000 {built-in method msgspec._core.json_decode}
   117407    0.299    0.000    0.299    0.000 {method 'findall' of 're.Pattern' objects}
   117408    0.052    0.000    0.164    0.000 __init__.py:660(update)
    49772    0.049    0.000    0.049    0.000 {built-in method _codecs.utf_8_decode}
   117408    0.042    0.000    0.087    0.000 {built-in method builtins.isinstance}
    49772    0.030    0.000    0.079    0.000 <frozen codecs>:319(decode)
   117407    0.025    0.000    0.025    0.000 {built-in method _collections._count_elements}
   117407    0.022    0.000    0.045    0.000 <frozen abc>:117(__instancecheck__)
   117407    0.022    0.000    0.022    0.000 {built-in method _abc._abc_instancecheck}
        1    0.001    0.0

In [14]:
%prun q3_time(FILE_PATH)

 

         232229 function calls in 1.190 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.609    0.609    1.190    1.190 q3_time.py:16(q3_time)
   117407    0.502    0.000    0.502    0.000 {built-in method msgspec._core.json_decode}
    49772    0.048    0.000    0.048    0.000 {built-in method _codecs.utf_8_decode}
    49772    0.027    0.000    0.075    0.000 <frozen codecs>:319(decode)
        1    0.002    0.002    0.002    0.002 heapq.py:523(nlargest)
    15239    0.001    0.000    0.001    0.000 __init__.py:601(__missing__)
        1    0.001    0.001    0.001    0.001 {built-in method io.open}
        1    0.000    0.000    1.190    1.190 <string>:1(<module>)
        1    0.000    0.000    0.002    0.002 __init__.py:610(most_common)
        1    0.000    0.000    1.190    1.190 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 {method '__exit__' of '_io._IOBase' objects}
    

Entonces, en los tres casos, poco menos de la mitad del tiempo de ejecución lo acapara `msgspec` al momento de decodificar las líneas del archivo JSON. Esto podría considerarse un cuello de botella, aunque en realidad `msgspec` es una herramienta bastante rápida para leer archivos JSON. Hagamos una comparación sencilla con el tiempo que le tomaría a la librería estándar `json`.

In [15]:
from json import loads

from msgspec.json import decode

from schemas import Tweet


def read_json(file_path: str):
    with open(file_path, encoding="utf-8") as json_file:
        for line in json_file:
            _ = loads(line)


def read_json_with_schema(file_path: str):
    with open(file_path, encoding="utf-8") as json_file:
        for line in json_file:
            _ = decode(line, type=Tweet)

In [16]:
%prun read_json(FILE_PATH)

 

         1273623 function calls in 3.165 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   117407    1.982    0.000    1.982    0.000 decoder.py:343(raw_decode)
        1    0.705    0.705    3.165    3.165 1208675.py:8(read_json)
   117407    0.152    0.000    2.243    0.000 decoder.py:332(decode)
   117407    0.097    0.000    2.374    0.000 __init__.py:299(loads)
   234814    0.077    0.000    0.077    0.000 {method 'match' of 're.Pattern' objects}
    49772    0.053    0.000    0.053    0.000 {built-in method _codecs.utf_8_decode}
    49772    0.032    0.000    0.085    0.000 <frozen codecs>:319(decode)
   117407    0.023    0.000    0.023    0.000 {method 'startswith' of 'str' objects}
   234814    0.020    0.000    0.020    0.000 {method 'end' of 're.Match' objects}
   117407    0.012    0.000    0.012    0.000 {built-in method builtins.len}
   117407    0.012    0.000    0.012    0.000 {built-in method builtins.isin

In [17]:
%prun read_json_with_schema(FILE_PATH)

 

         216960 function calls in 1.099 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.534    0.534    1.098    1.098 1208675.py:14(read_json_with_schema)
   117407    0.492    0.000    0.492    0.000 {built-in method msgspec._core.json_decode}
    49772    0.047    0.000    0.047    0.000 {built-in method _codecs.utf_8_decode}
    49772    0.026    0.000    0.072    0.000 <frozen codecs>:319(decode)
        1    0.000    0.000    0.000    0.000 {built-in method io.open}
        1    0.000    0.000    1.099    1.099 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 {method '__exit__' of '_io._IOBase' objects}
        1    0.000    0.000    1.098    1.098 <string>:1(<module>)
        1    0.000    0.000    0.000    0.000 <frozen codecs>:309(__init__)
        1    0.000    0.000    0.000    0.000 interactiveshell.py:316(_modified_open)
        1    0.000    0.000    0.000    0.000 {

`msgspec` es tres veces más rápido que la librería estándar, al menos en el caso específico de nuestro archivo JSON.

## Soluciones alternativas
Por supuesto, hay otras formas de parsear datos que también pueden ser eficientes, tanto en tiempo de ejecución como en uso de memoria. Podemos, por ejemplo, usar `pandas`. Sin embargo, las soluciones aquí presentadas están pensadas específicamente para procesar nuestro archivo JSON, cuyos datos son semiestructurados, mientras que `pandas` es óptimo para trabajar con datos estructurados.

En este *challenge* no se usó `pandas` porque el sólo hecho de leer el archivo con `pd.read_json` causaba un incremento considerable tanto en uso de memoria como en tiempo de ejecución. Ahora bien, esto no significa que `pandas` no sea eficiente en cuanto a uso de memoria/tiempo de ejecución, sino que esto dependerá del volumen de datos y la estructura que estos tengan (si son estructurados o no, y si no, qué tan complejos o cuántos niveles de profundidad tienen).

Hagamos una prueba rápida tomando como ejemplo `q1` y codifiquemos su equivalente en `pandas`

In [18]:
from datetime import datetime
from typing import List, Tuple

import pandas as pd


def q1_pandas(file_path: str) -> List[Tuple[datetime.date, str]]:
    """
    Gets the top 10 dates with the most tweets and the user with the most
    posts for each day while using pandas.

    Args:
        file_path: The path to the input JSON file.

    Returns:
        A list of tuples, where each tuple contains a date and its
            corresponding top user.
    """
    chunk_list = []
    chunksize = 10000
    for chunk in pd.read_json(file_path, lines=True, chunksize=chunksize):
        chunk["date"] = pd.to_datetime(chunk["date"]).dt.date
        chunk["username"] = chunk["user"].apply(lambda x: x["username"])
        chunk_list.append(chunk)

    df = pd.concat(chunk_list, ignore_index=True)
    df_grouped = df.groupby(["date", "username"]).size().reset_index(name='count')
    top_10_dates = df.groupby('date').size().nlargest(10).index

    results = []
    for date in top_10_dates:
        date_group = df_grouped[df_grouped['date'] == date]
        top_user = date_group.loc[date_group['count'].idxmax()]
        results.append((date, top_user['username']))

    return results


In [19]:
%prun q1_pandas_result = q1_pandas(FILE_PATH)

 

         948800 function calls (943186 primitive calls) in 9.151 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       12    4.448    0.371    4.500    0.375 {built-in method pandas._libs.json.ujson_loads}
     6012    0.782    0.000    0.785    0.000 datetimes.py:645(__iter__)
        1    0.613    0.613    9.151    9.151 <string>:1(<module>)
       13    0.539    0.041    7.375    0.567 _json.py:1085(__next__)
       12    0.283    0.024    0.435    0.036 construction.py:891(_list_of_dict_to_arrays)
       12    0.221    0.018    0.626    0.052 _json.py:971(_combine_lines)
       12    0.192    0.016    5.453    0.454 _json.py:1397(_parse)
       48    0.183    0.004    0.183    0.004 {method 'join' of 'str' objects}
      108    0.176    0.002    0.284    0.003 managers.py:2276(_merge_blocks)
   117408    0.170    0.000    0.170    0.000 {method 'strip' of 'str' objects}
       96    0.163    0.002    0.169    0.002 dat

Más de la mitad del tiempo de ejecución lo acapara la lectura del archivo JSON, aun cuando se está leyendo por *chunks*. `pandas` es una excelente herramienta, pero para este caso en específico, no es la opción más rápida debido al tiempo que se necesita para leer los datos.

En vez, en la solución `q1_time` optamos por trabajar con arreglos de `numpy`; a cambio de un uso mayor de memoria conseguimos el procesamiento de datos sin tener que iterar con un bucle `for` como en `q1_memory`.

Por completez, validemos que los resultados con `pandas` sean consistentes con los obtenidos por las otras dos funciones.

In [20]:
print(q1_mem_result == q1_time_result == q1_pandas_result, q1_pandas_result)

True [(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]


Cabe resaltar que la lectura línea por línea del archivo JSON (o bien *chunk* por *chunk*) se vuelve cada vez menos viable conforme el conjunto de datos crece. En nuestro caso es un archivo JSON relativamente pequeño, pero si tuviéramos que lidiar con un conjunto de datos cuyo volumen estuviera en el orden de GiB o incluso TiB, debemos acudir al procesamiento distribuido. Herramientas como `pyspark` o `apache_beam` serían la solución. Por supuesto, en estos casos tendríamos que aprovisionar un clúster; en GCP, por ejemplo, necesitaríamos usar Dataproc o Dataflow respectivamente.

Para efectos prácticos, corramos `apache_beam` en local sólo para ilustrar el ejemplo. `q3` se presta muy bien para construir un *pipeline* con dicha herramienta.

In [21]:
from json import loads

from apache_beam import CombinePerKey, Map, Filter, FlatMap, Pipeline
from apache_beam.transforms.combiners import Top
from apache_beam.io import ReadFromText


def run_beam_pipeline():

    with Pipeline() as pipeline:
        _ = (
            pipeline
            | "ReadFromText" >> ReadFromText(FILE_PATH)
            | "ConvertToDict" >> Map(lambda record: loads(record))
            | "FilterNulls" >> Filter(lambda record: record.get("mentionedUsers") is not None)
            | "GetUsers" >> FlatMap(lambda record: record["mentionedUsers"])
            | "GetUsernames" >> Map(lambda record: (record["username"], 1))
            | "CombinePerKey" >> CombinePerKey(sum)
            | "GetTop10" >> Top.Of(10, key=lambda record: record[1])
            | Map(print)  # Here goes your sink, something like WriteToText or WriteToBigQuery
        )

In [22]:
run_beam_pipeline()





[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


Nótese en el `stdout` que los resultados son consistentes con `q3_mem_result` y `q3_time_result`, lo cual comprueba que `apache_beam` es también una alternativa para hacer este procesamiento de datos, aunque esta solución sería más apropiada para correr en un ambiente distribuido para atender volúmenes de datos mucho más grandes. `apache_beam` sólo se debe correr en local con propósitos de desarrollo y pruebas.

## Conclusiones
Como conclusiones me gustaría enlistar las siguientes:
- Los resultados son consistentes entre las funciones que optimizan el uso de memoria y las que optimizan el tiempo de ejecución.
- Para las funciones que priorizan el bajo uso de memoria se evitó al máximo el uso de librerías externas y se procuró trabajar con las funciones estándar de python.
- Para las funciones que priorizan el tiempo de ejecución se puso especial enfásis en la lectura del archivo JSON, ya que se identificó que era en esa etapa donde el código toma mayor tiempo. Por lo mismo, se decidió usar la librería `msgspec`, la cual es una alternativa más rápida a la librería estándar `json`. Además, en `q1` se aprovechó el uso de arreglos `numpy` para evitar el uso de bucles al buscar los usuarios más activos.
- Las soluciones propuestas son específicas para el archivo JSON proporcionado. Con otros conjuntos de datos otras soluciones podrían ser viables tanto en términos de uso de memoria como en tiempos de ejecución, dependiendo del volumen y de la estructura de los datos, entre las cuales se mencionaron `pandas` y `apache_beam`.
- Procuré codificar de manera tal que el código se explique por sí solo. Dejé algunos comentarios donde lo vi necesario.