# Ejemplos de Paralelización y Tips Rendimiento Big Data
En el primer ejemplo revisamos como lanzar 4 tareas(threads) en paralelo

**Nota**: Jupyter no es óptimo para correr tareas en paralelo, en particular la versión de Windows de Ipython no extrae el máximo provecho de las librerias de threading, se recomienda usar python desde la consola para grandes volumenes de datos/procesamiento

### Ejemplo 1

In [None]:
import threading
import random

In [None]:
def worker(count):
    print('Trabajo %d lanzado! Resultado=%f'%(count,random.random()))
    return

In [None]:
threads = list()
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
    
for tt in threads:
    tt.join()
print("todos los trabajos terminaron")

### Ejemplo 2
Librerias como numpy ya contemplan en muchas de sus funciones optimizaciones y threads para acelerar el cómputo.
En este caso veremos la función producto punto

In [None]:
import numpy as np

In [None]:
np.random.seed(1)
n=1000000
a=np.random.rand(n)
b=np.random.rand(n)

#### proceso serial

In [None]:
%%time 
i=0
pdot=0;
while i<n:
   pdot+=a[i]*b[i] 
   i=i+1
print('Resultado =',pdot)

#### con numpy que debajo utiliza librerias de aceleracion como BLAS
[https://es.wikipedia.org/wiki/Basic_Linear_Algebra_Subprograms](https://es.wikipedia.org/wiki/Basic_Linear_Algebra_Subprograms)

In [None]:
%%time
pdot=np.dot(a,b)
pdot

### Ejemplo 3 
Threads para una funcion mas compleja, ordenar un vector random.

En general ordenar un vector de n elementos requiere nlog(n) operaciones.

[https://en.wikipedia.org/wiki/Sorting_algorithm](https://en.wikipedia.org/wiki/Sorting_algorithm)

In [None]:
from multiprocessing import Pool
from timeit import default_timer as timer
import functions

Contenido de functions.py

`
import numpy as np
def createandsort (n):
 rand = np.random.RandomState(42) #semilla para reproducir mismos numeros
 a = rand.rand(n) #array de n elementos random
 return a.sort() #Ordena de menor a menor (nlogn nro de operaciones)
`

In [None]:
vector_size = 10000000
#Creamos sizes con 3 arrays
sizes = [vector_size for i in range(0,3)] #cada array es de tamaño vector_size

In [None]:
#usando funcion secuencial
tic = timer()
[functions.createandsort(size) for size in sizes]
tac = timer()
print("tiempo secuencial: ", tac-tic)


In [None]:
#usando pool multiprocessing
if __name__ == "__main__":
    pool = Pool(processes=3)
    tic = timer()
    pool.map(functions.createandsort,sizes)
    tac = timer()
    print("tiempo paralelo: ",tac-tic)

### Ejemplo 4. 
Lectura y manejo de datos Pandas y Dask.

Revisamos como leer un archivo de datos csv grande con pandas, y con dask que utiliza paralelizacion para acelerar funciones.

[https://dask.org/](https://dask.org/)

Primero creamos un archivo dummy con 1 millon de filas

In [None]:
import pandas as pd
import dask.dataframe as dd

In [None]:
nrows = 1000000
val=np.random.rand(nrows,5)
df = pd.DataFrame(data=val,columns=["col1","col2","col3","col4","col5"])
df.to_csv("dummy.csv",header=True)
df.describe()

Lectura con Pandas

In [None]:
%%time
df2=pd.read_csv("dummy.csv")


Lectura con Dask

In [None]:
%%time
df3=dd.read_csv("dummy.csv")

Reemplazo con pandas

In [None]:
%%time
res=df2['col1'].mask(df2['col1']>0.5,1.0)

Reemplazo con Dask

In [None]:
%%time
res=df3['col1'].mask(df3['col1']>0.5,1.0)

### Uso de Memoria

Para grandes volumenes de datos es importante el consumo de memoria RAM.

Es importante manejar los tipos de datos minimos necesarios para representar la información

In [None]:
df2.dtypes

Pandas por defecto

In [None]:
mb = df2.memory_usage(index=False,deep=True).sum() / 1024 /1024
print('Uso memoria %d MB'%mb)

Pandas definiendo variables de 16bits para cada float

In [None]:
df4=pd.read_csv("dummy.csv",dtype={"col1":"float16","col2":"float16","col3":"float16","col4":"float16","col5":"float16"})

In [None]:
mb = df4.memory_usage(index=False,deep=True).sum() / 1024 /1024
print('Uso memoria %d MB'%mb)