# Paralelización en Python

## 1. Introduccion

Ray es un proyecto open source para paralelismo y sistemas distribuidos en Python. Es un framework para propositos generales para programar clusters.

Basado en: https://rise.cs.berkeley.edu/blog/ray-tips-for-first-time-users/

<b>Características del equipo:</b><br>
Este experimento se realizo sobre una HP Pavilion TS 15 Notebook PC, 4 Core modelo: AMD A10-5745M 1100/2100 MHz, 10.9 GiB RAM. SO: Linux Mint 20.1 Ulyssa.

## 2. Paralelización en un Cluster

Se paralelizará una función que símplemente produce una demora de 1 segundo y se mide la demora global para cada caso. 

In [1]:
# Librerias basicas
import time
import ray
import numpy as np
# Caracteristicas del Cluster
num_cores=4

### 2.1. Realización de una tarea sin paralelizar

In [3]:
# La función:
def hacer_algo(x):
    time.sleep(1) # Reemplazar esto por sus tareas
    return x
inicio = time.time()
resultados = []
# Ejecuto la función una vez por cada CPU:
for x in range(num_cores):
    resultados.append(hacer_algo(x))
print("duracion =", time.time() - inicio, "\nresultados = ", resultados)

duracion = 4.003793478012085 
resultados =  [0, 1, 2, 3]


<b>Observación:</b><br>
Se ejecuta la funcion "hacer_algo(x)" donde "x" es el número de CPU. Como la función produce una demora de 1 segundo, al no paralelizarse, la demora global es de: <i>num_cpus * demora(hacer_algo())= 4s</i> (demora: 4 segundos).

### 2.2. Realizar una tarea paralelizada

In [6]:
# Inicializacion Instancia Ray
# Nota: Si se ejecuta ray.init dos veces, se produce un error.
ray.init(num_cpus = num_cores)

duracion = 0.09058403968811035 
resultados =  [ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000), ObjectRef(16310a0f0a45af5cffffffffffffffffffffffff0100000001000000), ObjectRef(c2668a65bda616c1ffffffffffffffffffffffff0100000001000000), ObjectRef(32d950ec0ccf9d2affffffffffffffffffffffff0100000001000000)]


In [6]:
# La función con el Decorador:
@ray.remote
def hacer_algo(x):
    time.sleep(1) # Reemplazar esto por sus tareas
    return x
inicio = time.time()
resultados = []
# Ejecuto la función una vez por cada CPU remota:
for x in range(num_cores):
    resultados.append(hacer_algo.remote(x))
print("duracion =", time.time() - inicio, "\nresultados = ", resultados)

duracion = 0.09058403968811035 
resultados =  [ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000), ObjectRef(16310a0f0a45af5cffffffffffffffffffffffff0100000001000000), ObjectRef(c2668a65bda616c1ffffffffffffffffffffffff0100000001000000), ObjectRef(32d950ec0ccf9d2affffffffffffffffffffffff0100000001000000)]


<b>Observación:</b><br>
El tiempo total de ejecucion resulto ser de una fracción de segundo, lo cual no parece ser el resultado esperado (se esperaba algo más de 1 seg). Esto es porque devuelve el tiempo total para la invocación de las funciones, y no los tiempos de ejecución. Devuelve también, los IDs de cada uno de los objetos que contendrán los resultados tras la finalización del trabajo.

#### El metodo GET

Usando el método GET de RAY para obtener el tiempo total insumido.

In [10]:
inicio = time.time()
resultados = []
# Ejecuto la función una vez por cada CPU remota:
for x in range(num_cores):
    resultados.append(ray.get(hacer_algo.remote(x)))
print("duracion =", time.time() - inicio, "\nresultados = ", resultados)

duracion = 4.038443088531494 
resultados =  [0, 1, 2, 3]


### 2.3. Evitar paralelizar tareas muy pequeñas

Si se parlelizan gran cantidad de tareas muy pequeñas podemos tener efectos no deseados: a causa del tiempo mínimo que se usa para procesar la función, propagada por la cantidad de hilos produce una demora considerable.

In [31]:
cant_tareas=20000
# La función "hacer_algo_chico" con el Decorador:
@ray.remote
def hacer_algo_chico(x):
    time.sleep(0.0001) # Reemplazar por tu función
    return x
inicio = time.time()
resultados_ids = []
# Ejecuto la función una vez por cada CPU remota:
for x in range(cant_tareas):
    resultados_ids.append(hacer_algo_chico.remote(x))
resultado=ray.get(resultados_ids)
print("duracion =", time.time() - inicio)

duracion = 16.606837034225464


Observar que el tiempo total insumido es muy alto en comparación con lo que debería ser. Esto se debe al peso del procesamiento por hilo.<br>

#### Englobarlas en una tarea mayor

In [35]:
#Función mayor que engloba:
@ray.remote
def hacer_algo_grande(inicio, fin):
    resultados = []
    for x in range(inicio,fin):
        resultados.append(hacer_algo_chico.remote(x))
    return resultados
# Main:
inicio = time.time()
resultados_ids = []
iterac=int(cant_tareas/1000)
for x in range(iterac):
    resultados_ids.append(hacer_algo_grande.remote(x*int(cant_tareas/iterac), (x+1)*int(cant_tareas/iterac)))
resultados = ray.get(resultados_ids)
print("duracion =", time.time() - inicio)

duracion = 6.429934024810791


Puede observarse que el tiempo de procesamiento global se reduce al 30% en comparación con tareas pequeñas paralelizadas.

#### Sobrecarga por tarea:

¿Cuál es la sobrecarga por cada tarea?

In [36]:
# Funcion que no hace nada
@ray.remote
def hacer_nada(x):
    return x 
# Main;
inicio = time.time()
resultados_ids = []
for x in range(cant_tareas):
    resultados_ids.append(hacer_nada.remote(x))
resultados = ray.get(resultados_ids)
print("Sobregarga por tarea (ms) =", (time.time() - inicio)*1000/cant_tareas)

Sobregarga por tarea (ms) = 0.8735753655433655
