## Intro MapReduce y paralelismo
---

## Analítica de Grandes Datos
* ##### Facultad de Minas
* ##### Universidad Nacional de Colombia
* ##### Autor: Valentina Vásquez Hernandez

<img src="meme1.jpg" alt="drawing" width="300"/>

---

### 1. ¿Qué es una expresión regular en Python?

Una expresión regular se define "como una secuencia de comandos que especifica un patrón de busqueda en texto" [1]. Originalmente, se crean dos tipos de interpretacines: POSIX y Perl. Son utilizadas desde su nacimiento para procesar datos tipo texto y cuentan con una implementación en la mayoría de lenguajes de programación.


 Las expresiones regulares en Python se basan en Perl y es necesario importar la librería `re` para su uso. A continuación algunas consideraciones importantes:

**Consideraciones:**

* Instalar: Usualmente viene por defecto, en caso de no contar con este, instalar con el siguiente comando: `pip install re`. 
* Importar: `import re`
* Metacaracteres: `. ^ $ * + ? { } [ ] \ | ( )`
* Funciones principales: `re.match()`, `re.search()`
* `timeit`: función utlizada para medir el tiempo de ejecucicón de un fragmento de código. Por defecto, el fragmento principal de ejecutará 1.000.000 de veces, sin embargo, con el parámetro *numbers* es posible modificarlo


 *Documentación*: https://docs.python.org/3/library/re.html

**Ejemplo**

In [1]:
## Se importa la librería
import re
import glob
## Se leen los datos a los que se le aplicarán las funciones
data_logs = open("data/logs.csv", 'r')

In [2]:
!head data/logs.csv

id;hour;city_country
1;2000-07-11T08:57:17-07:00;Medellin Antioquia
2;2022-04-11T01:57:20-07:00;Medellin Antioquia
3;2000-07-11T08:57:17-07:01;medellin Antioquia
4;2022-04-11T01:57:20-07:01;Pasto Nariño
5;2000-07-11T03:57:17-07:02;Bogota Cundinamarca
6;2022-04-11T01:57:20-07:02;Bogota Cundinamarca
7;2000-07-11T08:57:17-07:03;Manizales Caldas
8;2022-04-11T01:14:20-07:03;Manisalez Caldas
9;2000-07-11T08:33:17-07:04;Medellin Antioquia


*PSL*

In [3]:
## Se establece el archivo en donde se van a guardar los datos formateados
file_new = open("data/logs_clean.csv", 'w')
file_new.write("id;date;hour;city;region\n")

primera_linea = True

for line in data_logs: 
    if primera_linea is True:
        primera_linea = False
        continue
        
    line = line.replace(';Manisalez',';Manizales')
    line = line.replace(';medellin',';Medellin')
    groups_hour = re.search('([0-9]+);([0-9]+-[0-9]+-[0-9]+)[a-zA-z]+([0-9]+:[0-9]+)', line)
    groups_city = re.search(';([a-zA-z]+) ([a-zA-z]+)', line)
    
    new_line =  groups_hour.group(1)+';'+groups_hour.group(2)+';'+groups_hour.group(3)+';'+groups_city.group(1)+';'+groups_city.group(2)+'\n'  
    line = line.replace(line,new_line)
    
    file_new.write(line)
    
    
data_logs.close()
file_new.close()

In [4]:
!head data/logs_clean.csv

id;date;hour;city;region
1;2000-07-11;08:57;Medellin;Antioquia
2;2022-04-11;01:57;Medellin;Antioquia
3;2000-07-11;08:57;Medellin;Antioquia
4;2022-04-11;01:57;Pasto;Nari
5;2000-07-11;03:57;Bogota;Cundinamarca
6;2022-04-11;01:57;Bogota;Cundinamarca
7;2000-07-11;08:57;Manizales;Caldas
8;2022-04-11;01:14;Manizales;Caldas
9;2000-07-11;08:33;Medellin;Antioquia


### 2. ¿Qué es el paralelismo?

Computación en paralelo o paralelismo es una arquitectura computacional que divide trabajos de gran envergadura en pequeñas tareas que se caracterizan por ser independientes y similares (con respecto al trabajo inicial). Estas tareas se ejecutan se manera secuencial, tomando un nodo por cada procesador del equipo actual (lo indica el usuario y es máximo el número de procesadores con los que cuente la máquina) o una red de procesadores (cluster).

> Esquema tradicional de procesamiento [1](https://hpc.llnl.gov/documentation/tutorials/introduction-parallel-computing-tutorial)


<img src="parallel1.jpg" alt="drawing" width="600"/>

> Esquema de procesamiento en paralelo [1](https://hpc.llnl.gov/documentation/tutorials/introduction-parallel-computing-tutorial)




<img src="parallel2.jpg" alt="drawing" width="600"/>

Para efectos del curso y los conceptos asociados al mismo, se trabajará con la librería `multiprocessing` que ofrece al usuario paralelizar procesos de manera local y remota. 

**Tips**

* La clase `Pool` permite ejecutar multiples tareas por proceso (procesador), por lo que no es necesario que el usuario administre por si mismo el número de tareas ni monitoree los procesos a usar. 
* La clase `Pool` no funciona en interpretes interactivos (Eje. Jupyter). Requiere del __main__ module.
* Se se utiliza `Pool` dentro de un interprete interactivo, la ejecución aparentemente se mostrará inconclusa y en el terminal se levantan los errores.
* Por defecto, la clase `Pool` utiliza como procesos todos los procesadores disponibles dentro de la máquina donde se ejecuta (os.cpu_count()). El usuario puede modificarlo con el parámetro *processes*.

*Documentación:* https://docs.python.org/3/library/multiprocessing.html 

In [5]:
##Cuenta cuántos procesadores tiene el equipo actual
import os
os.cpu_count()

8

### 3. ¿Qué es el algoritmo MapReduce?
Es un framework que se considera la base de la computación en paralelo que nació a principios de los años 80. El paper inicial fue publicado por Yahoo, sin embargo, la implementación aceptada por la comunidad, en OpenSource, la mantiene actualmente Apache. 

> Esquema tradicional MapReduce [2](https://www.researchgate.net/figure/The-MapReduce-architecture-MapReduce-Algorithm-There-are-four-steps-to-implement_fig2_305489358)


<img src="mapreduce.jpg" alt="drawing" width="600"/>


#### Paso 1. Map
Extraer una lista única de palabras dentro del dataset `data/logs_clean.csv`

In [7]:
import etl
from multiprocessing import Pool
from toolz.functoolz import compose
from toolz.itertoolz import concat

In [8]:
## Se aplica la función .split a todo el archivo leyendo linea por linea
with open("data/logs_clean.csv", "rt") as file:
    for line in file:
        print(line.split(";"))

['id', 'date', 'hour', 'city', 'region\n']
['1', '2000-07-11', '08:57', 'Medellin', 'Antioquia\n']
['2', '2022-04-11', '01:57', 'Medellin', 'Antioquia\n']
['3', '2000-07-11', '08:57', 'Medellin', 'Antioquia\n']
['4', '2022-04-11', '01:57', 'Pasto', 'Nari\n']
['5', '2000-07-11', '03:57', 'Bogota', 'Cundinamarca\n']
['6', '2022-04-11', '01:57', 'Bogota', 'Cundinamarca\n']
['7', '2000-07-11', '08:57', 'Manizales', 'Caldas\n']
['8', '2022-04-11', '01:14', 'Manizales', 'Caldas\n']
['9', '2000-07-11', '08:33', 'Medellin', 'Antioquia\n']
['10', '2022-04-11', '01:36', 'Medellin', 'Antioquia\n']


In [9]:
## Se aplica la función de usuario, de la función split_lines, a cada linea
with open("data/logs_clean.csv", "rt") as file:
    for line in file:
        print(etl.split_lines(line))

['id', 'date', 'hour', 'city', 'region\n']
['1', '2000-07-11', '08:57', 'Medellin', 'Antioquia\n']
['2', '2022-04-11', '01:57', 'Medellin', 'Antioquia\n']
['3', '2000-07-11', '08:57', 'Medellin', 'Antioquia\n']
['4', '2022-04-11', '01:57', 'Pasto', 'Nari\n']
['5', '2000-07-11', '03:57', 'Bogota', 'Cundinamarca\n']
['6', '2022-04-11', '01:57', 'Bogota', 'Cundinamarca\n']
['7', '2000-07-11', '08:57', 'Manizales', 'Caldas\n']
['8', '2022-04-11', '01:14', 'Manizales', 'Caldas\n']
['9', '2000-07-11', '08:33', 'Medellin', 'Antioquia\n']
['10', '2022-04-11', '01:36', 'Medellin', 'Antioquia\n']


In [10]:
## Se aplica la función de usuario a cada linea y se extraen los dos últimos campos
with open("data/logs_clean.csv", "rt") as file:
    for line in file:
        print(line.split(";")[-2]+";"+line.split(";")[-1])

city;region

Medellin;Antioquia

Medellin;Antioquia

Medellin;Antioquia

Pasto;Nari

Bogota;Cundinamarca

Bogota;Cundinamarca

Manizales;Caldas

Manizales;Caldas

Medellin;Antioquia

Medellin;Antioquia



In [11]:
## Se aplica la función split_lines a cada linea que retorna la función load_data
logs_clean_list = map(etl.split_lines, etl.load_data("data/logs_clean.csv"))
logs_clean_list

<map at 0x7f7dcf662e20>

In [12]:
## Retorna un objeto iterable
for i in logs_clean_list:
    print(i)

['Medellin', 'Antioquia\n']
['Medellin', 'Antioquia\n']
['Medellin', 'Antioquia\n']
['Pasto', 'Nari\n']
['Bogota', 'Cundinamarca\n']
['Bogota', 'Cundinamarca\n']
['Manizales', 'Caldas\n']
['Manizales', 'Caldas\n']
['Medellin', 'Antioquia\n']
['Medellin', 'Antioquia\n']


In [13]:
## Se aplican todas las funciones de preprocesamiento que se encuentran el `etl` con Pool
with Pool(processes=6) as pool:
    result = pool.map(etl.split_lines, etl.load_data("data/logs_clean.csv"))
    result = concat(result)
    result = pool.map(etl.remove_newline, result)
    result = pool.map(etl.remove_punctuation, result)
    result = pool.map(etl.tolower, result)

In [14]:
result

['medellin',
 'antioquia',
 'medellin',
 'antioquia',
 'medellin',
 'antioquia',
 'pasto',
 'nari',
 'bogota',
 'cundinamarca',
 'bogota',
 'cundinamarca',
 'manizales',
 'caldas',
 'manizales',
 'caldas',
 'medellin',
 'antioquia',
 'medellin',
 'antioquia']

In [15]:
## Se utiliza `compose` para definir el pipeline de la etl
compose_pipeline = compose(
    etl.remove_newline,
    etl.remove_punctuation,
    etl.tolower
)

with Pool() as pool:
    result = pool.map(etl.split_lines, etl.load_data("data/logs_clean.csv"))
    result = concat(result)
    result = pool.map(compose_pipeline, result)

result

['medellin',
 'antioquia',
 'medellin',
 'antioquia',
 'medellin',
 'antioquia',
 'pasto',
 'nari',
 'bogota',
 'cundinamarca',
 'bogota',
 'cundinamarca',
 'manizales',
 'caldas',
 'manizales',
 'caldas',
 'medellin',
 'antioquia',
 'medellin',
 'antioquia']

#### Paso 2. Reduce
Contar las repeticiones de las palabras presentes en el dataset `data/logs_clean.csv`

In [16]:
## Se cuentas las repeticiones que hay dentro de result
from functools import reduce
reduce(
    etl.make_counts,
    result,
    {},
)

{'medellin': 5,
 'antioquia': 5,
 'pasto': 1,
 'nari': 1,
 'bogota': 2,
 'cundinamarca': 2,
 'manizales': 2,
 'caldas': 2}

#### MapReduce
Mapear las palabras y realizar el conteo las repeticiones de las palabras presentes en el dataset `data/logs_clean.csv`

In [17]:
from toolz.sandbox.parallel import fold

compose_pipeline = compose(
    etl.remove_newline,
    etl.remove_punctuation,
    etl.tolower
) 
with Pool() as pool:

    result = pool.map(etl.split_lines, etl.load_data("data/logs_clean.csv"))
    result = concat(result)
    result = pool.map(compose_pipeline, result)
    result = fold(
        etl.make_counts,
        result,
        {},
        pool.map,
        10000,
        etl.sum_counts,
    )

In [18]:
result

{'medellin': 5,
 'antioquia': 5,
 'pasto': 1,
 'nari': 1,
 'bogota': 2,
 'cundinamarca': 2,
 'manizales': 2,
 'caldas': 2}

---