# LATAM - Challenge Data Engineer

*Autor: Iván Daniel Huerta Herrera*

*Versión: 1.0.0*

### Contexto

Se tiene la información de un conjunto de Tweets (de la red social Twitter) en un archivo de texto plano donde cada línea corresponde a un objeto JSON.

Se nos pide realizar una serie de tareas que serán descritas a continuación.

#### Problema 1

Encontrar las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días.

#### Problema 2

Listar los top 10 emojis más usados con su respectivo conteo.

#### Problema 3

Encontrar el top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos

### Restricciones

Cada problema debe implementar las functiones contenidas en los archivos `q<X>_memory.py` y `q<X>_time.py`, donde `<X>` corresponde al número de pregunta. Estas funciones deben optimizar el uso de memoria o el tiempo de ejecución respectivamente.

## Consideraciones en la solución

Para la optimización de cada uno de los parámetros solicitados se considerará:

 - *Memoria*: El documento se leera en de línea en línea y no completo, y la información agregada se hará de forma directa.
 
 - *Tiempo*: Se utilizará como estrategia usar *MapReduce* con computación distribuida. De manera que las agregaciones se puedan realizar en paralelo. 

**Todo lo ejecutado en este desafío se realizó en un cluster de Databricks, pero si se tiene instalado spark localmente también se podría ejecutar ahí**

In [None]:
%sh

pip install memory-profiler==0.61.0

Collecting memory-profiler==0.61.0
  Downloading memory_profiler-0.61.0-py3-none-any.whl (31 kB)
Installing collected packages: memory-profiler
Successfully installed memory-profiler-0.61.0



[notice] A new release of pip available: 22.2.2 -> 24.1.2
[notice] To update, run: pip install --upgrade pip


### Inicialización de recursos importantes

In [None]:
file_path = "/Volumes/test/default/data/farmers-protest-tweets-2021-2-4.json"

mem_usage = {}
time_usage = {}

Antes de ver si se debe hacer o no limpieza de datos se verificará que no existan repetidos, el resto de la información se verificará con los schemas y los parsings que se hacen en los scripts

In [None]:
from utils.pyspark_mng import PySparkManager
from utils.pyspark_schema import tweet_schema

with PySparkManager() as spark:
    tweet_df = spark.read.json(file_path, schema=tweet_schema)
    rows = tweet_df.groupBy("id").count().orderBy(
        "count", ascending=False
    ).limit(1).show()

+-------------------+-----+
|                 id|count|
+-------------------+-----+
|1364486773540331522|    1|
+-------------------+-----+



Se crea una función que utiliza los decoradores para la medición de la memoría y el tiempo para ejecutar la función objetivo

In [None]:
from utils.profilers import mem_profiler, time_profiler

def run_function(func, tag, file): 
    @time_profiler(time_map=time_usage, tag=tag)
    @mem_profiler(mem_map=mem_usage, tag=tag)
    def wrapper(file):
        return func(file)
    return wrapper(file)

#### Problema 1

##### Resolución optimizando la memoria RAM

In [None]:
from q1_memory import q1_memory

run_function(
    lambda x: print(q1_memory(x)),
    "q1_memory",
    file_path
)

[(datetime.date(2021, 2, 19), 'Preetm91'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 21), 'Surrypuria'), (datetime.date(2021, 2, 18), 'rebelpacifist'), (datetime.date(2021, 2, 19), 'KaurDosanjh1979'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 15), 'jot__b')]


##### Resolución optimizando el tiempo

In [None]:
from q1_time import q1_time

run_function(
    lambda x: print(q1_time(x)),
    "q1_time",
    file_path
)

[(datetime.date(2021, 2, 19), 'Preetm91'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 21), 'Surrypuria'), (datetime.date(2021, 2, 18), 'rebelpacifist'), (datetime.date(2021, 2, 19), 'KaurDosanjh1979'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 15), 'jot__b')]


#### Problema 2

##### Resolución optimizando la memoria RAM

In [None]:
from q2_memory import q2_memory

run_function(
    lambda x: print(q2_memory(x)),
    "q2_memory",
    file_path
)

[('🙏', 7286), ('😂', 3072), ('🚜', 2972), ('✊', 2411), ('🌾', 2363), ('🏻', 2080), ('❤', 1779), ('🤣', 1668), ('🏽', 1218), ('👇', 1108)]


##### Resolución optimizando el tiempo

In [None]:
from q2_time import q2_time

run_function(
    lambda x: print(q2_time(x)),
    "q2_time",
    file_path
)

[('🙏', 7286), ('😂', 3072), ('🚜', 2972), ('✊', 2411), ('🌾', 2363), ('🏻', 2080), ('❤', 1779), ('🤣', 1668), ('🏽', 1218), ('👇', 1108)]


En este problema se tomó en consideración que los emoji se contabilizan sólo en los mensajes del Tweet, y los Tweet relacionados a este no se tomaron en cuenta para evitar duplicar la información en caso de haber referencias múltiples.

Para los Emojis se uso la librería `emoji` indirectamente. Se copió uno de los archivos en utils y se agregó una función para ver si hay Emojis.

#### Problema 3

##### Resolución optimizando la memoria RAM

In [None]:
from q3_memory import q3_memory

run_function(
    lambda x: print(q3_memory(x)),
    "q3_memory",
    file_path
)

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


##### Resolución optimizando el tiempo

In [None]:
from q3_time import q3_time

run_function(
    lambda x: print(q3_time(x)),
    "q3_time",
    file_path
)

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


Para el caso de las menciones, se aprovechó la estructura de datos que Twitter (X) proporciona. De esta manera no se buscó hacer operaciones de búsqueda en el String o con expresiones regulares

#### Análisis de memoria

In [None]:
print(mem_usage["q1_memory"])

Filename: /Workspace/Users/ivan.huerta.h@gmail.com/.ide/Challenge-34db729e/src/utils/profilers.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    25    212.1 MiB    212.1 MiB           1                   @mem_profile(stream=buffer)
    26                                                         def run_func():
    27    226.3 MiB     14.2 MiB           1                       result = func(*args, **kwargs)





In [None]:
print(mem_usage["q1_time"])

Filename: /Workspace/Users/ivan.huerta.h@gmail.com/.ide/Challenge-34db729e/src/utils/profilers.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    25    226.3 MiB    226.3 MiB           1                   @mem_profile(stream=buffer)
    26                                                         def run_func():
    27    226.3 MiB      0.0 MiB           1                       result = func(*args, **kwargs)





In [None]:
print(mem_usage["q2_memory"])

Filename: /Workspace/Users/ivan.huerta.h@gmail.com/.ide/Challenge-34db729e/src/utils/profilers.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    25    375.9 MiB    375.9 MiB           1                   @mem_profile(stream=buffer)
    26                                                         def run_func():
    27    375.9 MiB      0.0 MiB           1                       result = func(*args, **kwargs)





In [None]:
print(mem_usage["q2_time"])

Filename: /Workspace/Users/ivan.huerta.h@gmail.com/.ide/Challenge-34db729e/src/utils/profilers.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    25    293.6 MiB    293.6 MiB           1                   @mem_profile(stream=buffer)
    26                                                         def run_func():
    27    293.6 MiB      0.0 MiB           1                       result = func(*args, **kwargs)





In [None]:
print(mem_usage["q3_memory"])

Filename: /Workspace/Users/ivan.huerta.h@gmail.com/.ide/Challenge-34db729e/src/utils/profilers.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    25    290.5 MiB    290.5 MiB           1                   @mem_profile(stream=buffer)
    26                                                         def run_func():
    27    290.5 MiB      0.0 MiB           1                       result = func(*args, **kwargs)





In [None]:
print(mem_usage["q3_time"])

Filename: /Workspace/Users/ivan.huerta.h@gmail.com/.ide/Challenge-34db729e/src/utils/profilers.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    25    290.5 MiB    290.5 MiB           1                   @mem_profile(stream=buffer)
    26                                                         def run_func():
    27    290.5 MiB      0.0 MiB           1                       result = func(*args, **kwargs)






En el caso de la memoría hay que tener cuidado en el análisis, ya que existe información precargada, que son los 200MB que usan las funciones de Spark acá. Además no se considera los estados intermedios de uso de memoria.

Pregunta|Optimización|Incremento de Memoria[MiB]
--|--|--
1|memoria|14.2
1|tiempo|0
2|memoria|0
2|tiempo|0
3|memoria|0
3|tiempo|0

Se podría mejorar este análisis viendo con mayor granularidad de funciones el uso de memoria

#### Análisis de tiempo

In [None]:
time_usage["q1_memory"].split("\n")[:15]

['         3657335 function calls (3655608 primitive calls) in 36.110 seconds',
 '',
 '   Ordered by: cumulative time',
 '',
 '   ncalls  tottime  percall  cumtime  percall filename:lineno(function)',
 '        1    0.000    0.000   36.110   36.110 profilers.py:21(wrapper)',
 '        1    0.000    0.000   36.110   36.110 memory_profiler.py:1185(wrapper)',
 '        1    0.000    0.000   35.426   35.426 memory_profiler.py:759(f)',
 '        1    0.001    0.001   35.426   35.426 profilers.py:25(run_func)',
 '        1    0.000    0.000   35.425   35.425 command-2167256912143093-1932399044:4(wrapper)',
 '        1    0.000    0.000   35.425   35.425 command-2167256912143096-933597278:4(<lambda>)',
 '        1    0.005    0.005   35.425   35.425 q1_memory.py:11(q1_memory)',
 '        1   19.512   19.512   35.420   35.420 operations.py:111(aggregate_multiline_json)',
 '   117407    1.401    0.000    8.779    0.000 __init__.py:299(loads)',
 '   117407    1.193    0.000    7.284    0.000 dec

In [None]:
time_usage["q1_time"].split("\n")[:15]

['         10269 function calls (10179 primitive calls) in 4.670 seconds',
 '',
 '   Ordered by: cumulative time',
 '',
 '   ncalls  tottime  percall  cumtime  percall filename:lineno(function)',
 '        1    0.000    0.000    4.670    4.670 profilers.py:21(wrapper)',
 '        1    0.000    0.000    4.670    4.670 memory_profiler.py:1185(wrapper)',
 '        1    0.000    0.000    4.484    4.484 memory_profiler.py:759(f)',
 '        1    0.001    0.001    4.484    4.484 profilers.py:25(run_func)',
 '        1    0.000    0.000    4.484    4.484 command-2167256912143093-1932399044:4(wrapper)',
 '        1    0.000    0.000    4.484    4.484 command-2167256912143098-4103342859:4(<lambda>)',
 '        1    0.000    0.000    4.483    4.483 q1_time.py:7(q1_time)',
 '     14/7    0.000    0.000    4.467    0.638 instrumentation_utils.py:39(wrapper)',
 '       74    0.001    0.000    4.356    0.059 socket.py:691(readinto)',
 "       74    4.354    0.059    4.354    0.059 {method 'recv_into

In [None]:
time_usage["q2_memory"].split("\n")[:15]

['         19356616 function calls in 59.760 seconds',
 '',
 '   Ordered by: cumulative time',
 '',
 '   ncalls  tottime  percall  cumtime  percall filename:lineno(function)',
 '        1    0.000    0.000   59.760   59.760 profilers.py:21(wrapper)',
 '        1    0.000    0.000   59.760   59.760 memory_profiler.py:1185(wrapper)',
 '        1    0.000    0.000   59.748   59.748 memory_profiler.py:759(f)',
 '        1    0.001    0.001   59.748   59.748 profilers.py:25(run_func)',
 '        1    0.000    0.000   59.747   59.747 command-2167256912143093-1932399044:4(wrapper)',
 '        1    0.000    0.000   59.747   59.747 command-2167256912143101-217985823:4(<lambda>)',
 '        1    0.000    0.000   59.747   59.747 q2_memory.py:8(q2_memory)',
 '        1    9.936    9.936   59.747   59.747 operations.py:111(aggregate_multiline_json)',
 '   117407    1.591    0.000   36.268    0.000 operations.py:89(get_value)',
 '   117407   31.864    0.000   34.573    0.000 emoji.py:53(get_emojis)'

In [None]:
time_usage["q2_time"].split("\n")[:15]

['         18214 function calls (18101 primitive calls) in 10.951 seconds',
 '',
 '   Ordered by: cumulative time',
 '',
 '   ncalls  tottime  percall  cumtime  percall filename:lineno(function)',
 '        1    0.000    0.000   10.950   10.950 profilers.py:21(wrapper)',
 '        1    0.000    0.000   10.950   10.950 memory_profiler.py:1185(wrapper)',
 '        1    0.000    0.000   10.738   10.738 memory_profiler.py:759(f)',
 '        1    0.001    0.001   10.738   10.738 profilers.py:25(run_func)',
 '        1    0.000    0.000   10.737   10.737 command-2167256912143093-1932399044:4(wrapper)',
 '        1    0.000    0.000   10.737   10.737 command-2167256912143103-2517778622:4(<lambda>)',
 '        1    0.000    0.000   10.736   10.736 command-2167256912143145-2297665257:14(q2_time2)',
 '      138    0.005    0.000   10.633    0.077 java_gateway.py:1015(send_command)',
 '      138    0.005    0.000   10.624    0.077 clientserver.py:524(send_command)',
 '    27/15    0.001    0.000 

In [None]:
time_usage["q3_memory"].split("\n")[:15]

['         2264821 function calls in 27.160 seconds',
 '',
 '   Ordered by: cumulative time',
 '',
 '   ncalls  tottime  percall  cumtime  percall filename:lineno(function)',
 '        1    0.000    0.000   27.160   27.160 profilers.py:21(wrapper)',
 '        1    0.000    0.000   27.160   27.160 memory_profiler.py:1185(wrapper)',
 '        1    0.000    0.000   26.965   26.965 memory_profiler.py:759(f)',
 '        1    0.001    0.001   26.965   26.965 profilers.py:25(run_func)',
 '        1    0.000    0.000   26.964   26.964 command-2167256912143093-1932399044:4(wrapper)',
 '        1    0.000    0.000   26.964   26.964 command-2167256912143107-3977979164:4(<lambda>)',
 '        1    0.001    0.001   26.964   26.964 q3_memory.py:7(q3_memory)',
 '        1   13.020   13.020   26.963   26.963 operations.py:111(aggregate_multiline_json)',
 '   117407    1.373    0.000    8.544    0.000 __init__.py:299(loads)',
 '   117407    1.149    0.000    7.086    0.000 decoder.py:332(decode)']

In [None]:
time_usage["q3_time"].split("\n")[:15]

['         11887 function calls (11799 primitive calls) in 4.517 seconds',
 '',
 '   Ordered by: cumulative time',
 '',
 '   ncalls  tottime  percall  cumtime  percall filename:lineno(function)',
 '        1    0.000    0.000    4.517    4.517 profilers.py:21(wrapper)',
 '        1    0.000    0.000    4.517    4.517 memory_profiler.py:1185(wrapper)',
 '        1    0.000    0.000    4.299    4.299 memory_profiler.py:759(f)',
 '        1    0.001    0.001    4.299    4.299 profilers.py:25(run_func)',
 '        1    0.000    0.000    4.298    4.298 command-2167256912143093-1932399044:4(wrapper)',
 '        1    0.000    0.000    4.298    4.298 command-2167256912143109-2002491190:4(<lambda>)',
 '        1    0.000    0.000    4.298    4.298 q3_time.py:7(q3_time)',
 '    19/10    0.001    0.000    4.265    0.426 instrumentation_utils.py:39(wrapper)',
 '       85    0.003    0.000    4.157    0.049 java_gateway.py:1015(send_command)',
 '       85    0.003    0.000    4.151    0.049 clients

Se puede ver a través de los valores que salen en (run_func) una comparación de cada tiempo que demoró en ejecutarse el código:

Pregunta|Optimización|tiempo[s]
--|--|--
1|memoria|35.426
1|tiempo|4.484
2|memoria|59.748
2|tiempo|10.738
3|memoria|26.965
3|tiempo|4.299

Si bien la disminución se ve sustancial, mayormente es por la precarga del DataFrame en Spark. Aún así las de tiempo son soluciones escalables que pueden mejorar con la adición de más workers


#### Temas no revisados que si consideraría en un caso real

No se realizó análisis exhaustivo de los datos, ya que la fuente de Twitter es relativamente confiable. Además que en el manejo de errores no tiró problemas.

En un caso real se debería inspeccionar los datos, ver que los formatos sean los correspondientes (como con los esquemas de Spark), intentar repararlos cuando se pueda o eliminar los que puedan sumar complejidad al procesamiento