
<div style="text-align:center">
    <img src="https://asociacionaepi.es/wp-content/uploads/2022/10/Python-image-with-logo.png" />
</div>


# POOL #

### ¿Qué es la clase Pool?

La clase `Pool` se encuentra en el módulo `multiprocessing`, el cual es parte de la biblioteca estándar de Python. Esta clase se utiliza para crear un grupo de procesos secundarios que pueden ejecutar tareas de manera paralela.

El propósito principal de la clase Pool es permitir la ejecución concurrente de múltiples procesos secundarios para realizar tareas de manera eficiente en sistemas multiprocesador o multinúcleo.

### Métodos Importantes de la clase Pool:
1. *apply()*
2. *map()*
3. *apply_async()*
4. *map_async()*
5. *close()*
6. *join()*
7. *terminate()*
8. *imap()*
9. *imap_unordered()*




## 1. COMO SE UTILIZA:

En primer lugar tenemos que importar la librería `multiporcessing` y crear una instancia de la clase `Pool`

In [3]:
from multiprocessing import Pool as pool

pool = pool()

In [2]:
import multiprocessing
pool = multiprocessing.Pool()

## 2. Métodos Importantes de la clase Pool:

### A) Método *`apply(función, args=(), kwds={}):`*:
Este método aplica la función dada a los argumentos proporcionados. Los argumentos `args` son una tupla de argumentos posicionales y `kwds` es un diccionario de argumentos de palabras clave. Retorna el resultado de la función.

Esto bloquea la ejecución del programa principal hasta que se complete la ejecución de la función en un proceso secundario y devuelve el resultado, que en este caso sería el factorial de 5.

Usando `args`

In [None]:
from multiprocessing import Pool

# Función para calcular el factorial de un número
def factorial(n):
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result

if __name__ == "__main__":
    # Creamos un pool de procesos con 4 procesos
    with Pool(processes=4) as pool:
        # Calculamos el factorial de 5 usando apply
        result = pool.apply(factorial, (5,))

    # Imprimimos el resultado
    print("El factorial de 5 es:", result)


`El factorial de 5 es: 120`

Usando `kwds`

In [4]:
from multiprocessing import Pool

# Función para calcular el área de un rectángulo
def calculate_area(length, width):
    return length * width

if __name__ == "__main__":
    # Parámetros para el cálculo del área del rectángulo
    params = {"length": 5, "width": 3}

    # Creamos un pool de procesos con 2 procesos
    with Pool(processes=4) as pool:
        # Aplicamos la función calculate_area con los parámetros
        # usando el argumento kwds para pasar los argumentos de palabras clave
        # () indica que no existen argumentos args posicionales
        result = pool.apply(calculate_area, (), params)

    # Imprimimos el resultado
    print("El área del rectángulo es:", result)

`El área del rectángulo es: 15`

### B) Método *`map(función, iterable)`*:
Este método aplica la función dada a cada elemento del iterable, de forma similar a la función `map` de Python estándar. Los elementos se distribuyen automáticamente entre los procesos secundarios en el pool para su procesamiento en paralelo. Retorna una lista de resultados en el mismo orden que el iterable.

In [None]:
from multiprocessing import Pool

# Función para calcular el cuadrado de un número
def square(x):
    return x * x

if __name__ == "__main__":
    # Lista de números
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    # Creamos un pool de procesos con 4 procesos
    with Pool(processes=4) as pool:
        # Utilizamos map para aplicar la función square a cada número en paralelo
        squared_numbers = pool.map(square, numbers)

    # Imprimimos los resultados
    print("Números originales:", numbers)
    print("Números al cuadrado:", squared_numbers)

`Números originales: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]`
</br>
`Números al cuadrado: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]`

### C) Método *`apply_async()`*:
Igual que el metodo `apply`, pero es un método asíncrono, lo que significa que devuelve un objeto AsyncResult de inmediato sin bloquear el proceso principal. El proceso principal puede continuar ejecutando otras tareas mientras se ejecuta la función en el proceso secundario.

In [None]:
from multiprocessing import Pool

# Función simple para calcular el cuadrado de un número
def square(x):
    return x * x

if __name__ == "__main__":
    # Creamos un pool de procesos con 2 procesos
    with Pool(processes=2) as pool:
        # Usamos apply_async para calcular el cuadrado de un número de manera asíncrona
        result_async = pool.apply_async(square, (1000000,))
        
        # El flujo de ejecución del programa principal no espera
        # continuamos haciendo otras operaciones mientras se realiza el cálculo
        print("Esto se imprime antes de obtener el resultado de apply_async")
        
        # Obtenemos el resultado cuando esté disponible
        result = result_async.get()
        print("Resultado de apply_async:", result)

`Esto se imprime antes de obtener el resultado de apply_async`
</br>
`Resultado de apply_async: 1000000000000`

### D) Método *`map_async(función, iterable, callback=None, error_callback=None)`*:
El método map_async() de la clase Pool en Python es similar al método map(), pero funciona de manera asíncrona, lo que significa que no bloquea el proceso principal mientras se están procesando las tareas en los procesos secundarios.

In [None]:
from multiprocessing import Pool

# Función para calcular la suma de los dígitos de un número
def sum_digits(n):
    total = sum(int(digit) for digit in str(n))
    return total

# Función de callback para manejar los resultados de las tareas completadas
def handle_result(result):
    print("Resultado de la tarea:", result)

if __name__ == "__main__":
    # Creamos un pool de procesos con 2 procesos
    with Pool(processes=2) as pool:
        # Lista de números
        numbers = [1234, 5678, 91011, 121314]

        # Aplicamos la función sum_digits a cada número de manera asíncrona con map_async
        async_result = pool.map_async(sum_digits, numbers, callback=handle_result)

        # Esperamos a que todas las tareas asíncronas se completen
        async_result.wait()

    print("Todas las tareas se han completado")

`Resultado de la tarea: [10, 26, 12, 12]`
</br>
`Todas las tareas se han completado`

### E) Método *`close()`*:
Este método cierra el pool, lo que significa que ya no se pueden agregar nuevas tareas al mismo. Sin embargo, los procesos secundarios que ya se están ejecutando continuarán hasta que completen su trabajo.

In [None]:
# Función que realiza un cálculo intensivo
def heavy_computation(x):
    # Simulamos una tarea intensiva que toma algún tiempo
    time.sleep(1)
    return x * x

if __name__ == "__main__":
    # Creamos un pool de procesos con 2 procesos
    with Pool(processes=2) as pool:
        # Creamos una lista de números
        numbers = [1, 2, 3, 4, 5]
        
        # Aplicamos la función heavy_computation a cada número de manera asíncrona
        result_async = pool.map_async(heavy_computation, numbers)
        
        # Llamamos al método close() para indicar que no se aceptarán más tareas
        pool.close()
        
        # Esperamos a que todas las tareas asíncronas se completen
        result_async.wait()

    print("Todas las tareas se han completado")

El problema con este enfoque es que no llamamos al método `close()` antes de esperar a que todas las tareas se completen. Esto significa que el pool aún está abierto y puede aceptar más tareas mientras esperamos. Si esto sucede, el programa se bloqueará indefinidamente, ya que `wait()` no retornará hasta que todas las tareas en el pool estén completas, lo que incluye las tareas adicionales que podrían agregarse mientras esperamos.

### F) Método `*join()*`:
Usado para los metodos asíncronos.

El método `join()` en el contexto de la clase Pool en Python se utiliza para esperar a que todas las tareas asíncronas que se hayan lanzado en el pool se completen antes de continuar con la ejecución del programa principal.

Cuando se llama al método `join()` en un objeto AsyncResult retornado por un método como `map_async` o `apply_async`, el flujo de ejecución del programa principal se bloqueará hasta que todas las tareas asociadas con ese objeto AsyncResult se completen.

In [None]:
from multiprocessing import Pool

# Función de ejemplo que realiza una tarea intensiva
def heavy_computation(x):
    # Simulamos una tarea intensiva que toma algún tiempo
    import time
    time.sleep(1)
    return x * x

if __name__ == "__main__":
    # Creamos un pool de procesos con 2 procesos
    with Pool(processes=2) as pool:
        # Aplicamos la función heavy_computation a una lista de números de manera asíncrona
        result_async = pool.map_async(heavy_computation, [1, 2, 3, 4, 5])
        
        # Esperamos a que todas las tareas asíncronas se completen antes de continuar
        result_async.join()

    print("Todas las tareas se han completado")

<img src = "https://th.bing.com/th/id/R.f0d9714ad9440054354f205a3b7c6f06?rik=WiQMdaEfe4HC9g&riu=http%3a%2f%2fsimpleicon.com%2fwp-content%2fuploads%2fwarning.png&ehk=lylsb1QR0ttNcAa3CrMzHZvHPHPxdoJ0Ee8y%2by75xWY%3d&risl=&pid=ImgRaw&r=0" style="width:50px;height:50px"/>

### Diferencia entre `wait()` y `join()`


En ambos casos, el metodo espera a que  un hilo termine su ejecución. La principal diferencia radica en cómo se comportan:
- *`wait()`*: espera a que todas las tareas se realicen antes de proseguir con la ejecución. Permite cancelar la ejecución por ejemplo estableciendo un `timeout()`, lo que devolvera un error `TimeoutException`
- *`join()`*: espera a que todas las tareas se realicen antes de proseguir con la ejecución. No finzaliza hasta que se realicen todas las tareas.

### G) Método *`terminate()`*:

Cuando llamas al método `terminate()` en un objeto Pool, todos los procesos secundarios del pool se detienen inmediatamente y se liberan los recursos asociados a ellos. Esto significa que cualquier tarea que esté en curso o en espera dentro del pool será interrumpida y no se completará. Es importante tener en cuenta que esto puede resultar en la pérdida de datos o en un estado inconsistente, por lo que se debe usar con precaución. El resto del programa sigue su ejecución.

### H) Método *`imap(función, iterable)`* y *`imap_unordered(función, iterable)`*:

La función `imap()` toma dos argumentos principales: la función que se aplicará a cada elemento del iterable y el iterable en sí mismo. La función especificada se ejecutará en paralelo en los procesos del pool, aplicándose a cada elemento del iterable de forma independiente.

La diferencia principal entre `imap()` y `map()` radica en que imap devuelve un iterador que genera los resultados a medida que están disponibles, en lugar de esperar a que todos los resultados se calculen antes de devolverlos todos juntos. Esto puede ser útil para procesar grandes conjuntos de datos de manera eficiente o para trabajar con resultados en tiempo real.

In [None]:
from multiprocessing import Pool

# Función de ejemplo

def square(x):
    return x * x

if __name__ == "__main__":
    # Creamos un pool de procesos con 2 procesos
    with Pool(processes=2) as pool:
        # Aplicamos la función square a cada número de manera asíncrona usando imap
        result_iterator = pool.imap(square, [1, 2, 3, 4, 5])
        
        # Iteramos sobre el iterador para obtener los resultados
        for result in result_iterator:
            print("Resultado:", result)

    print("Todas las tareas se han completado")

`Resultado: 1`
</br>
`Resultado: 4`
</br>
`Resultado: 9`
</br>
`Resultado: 16`
</br>
`Resultado: 25`

La diferencia principal de `imap()` e `imap_unordered()` es que este último, a diferencia de `imap()` no asegura que los resultados se devuelvan en el orden en el que se aplica sobre el iterable.

# 3. EJERCICIO DE EJEMPLO DE CLASE POOL:

In [None]:
from multiprocessing import Pool
from functools import reduce

# Función para procesar una línea y calcular la suma de los números
def procesar_linea(linea):
    numeros = list(map(int, linea.strip().split(',')))
    suma = sum(numeros)
    return suma

if __name__ == '__main__':
    # Nombre del archivo
    nombre_archivo = 'datos_grandes.txt'


    # Abrir el archivo y leer todas las líneas
    with open(nombre_archivo, 'r') as archivo:
        lineas = archivo.readlines()

    # Crear un pool de procesos con el número de procesos determinado
    with Pool(processes=4) as pool:
        # Método map para procesar líneas
        resultados_map = pool.map(procesar_linea, lineas)

        # Método apply_async para procesar una línea específica de manera asincrónica
        resultado_apply_async = pool.apply_async(procesar_linea, (lineas[0],))

        # Método map_async para procesar líneas de manera asincrónica
        resultados_map_async = pool.map_async(procesar_linea, lineas)

        # Método apply para procesar una línea específica de manera síncrona
        resultado_apply = pool.apply(procesar_linea, (lineas[1],))

        # Combinar los resultados
        suma_total = reduce(lambda x, y: x + y, resultados_map)

        # Imprimir los resultados
        print("Resultado usando map:", resultados_map)
        print("Resultado usando apply_async:", resultado_apply_async.get())
        print("Resultado usando map_async:", resultados_map_async.get())
        print("Resultado usando apply:", resultado_apply)
        print("Suma total:", suma_total)

`Resultado usando map: [10, 26, 42]`
</br>
`Resultado usando apply_async: 10`
</br>
`Resultado usando map_async: [10, 26, 42]`
</br>
`Resultado usando apply: 26`
</br>
`Suma total: 78`