# Multiprocessing

## Tabla de contenidos
***



***

Aquí se revisará como se puede usar directamente múltiples threads o procesos para acelerar el código y que riesgos hay que tener en mente.

El módulo `threading` hace posible el correr código en paralelo en un único proceso. Esto hace el threading muy útil para tareas de I/O como leer/escribir sobre archivos o comunicación en redes, pero una opción inútil para cálculos pesados y lentos, donde el módulo `multiprocessing` brilla.

Con el módulo `multiprocessing`, se puede correr código en múltiples procesos, lo que significa que se puede correr código en múltiples cores de GPU, múltiples procesadores e incluso múltiples computadores.

El módulo `threading` es básico, en el sentido de que se tienen que crear y manejar los threads de forma manual. Para esto, se tiene el módulo `concurrent.futures`, que ofrece una manera simple de ejecutar una lista de tareas ya sea a través de threads o procesos.

## The Global Interpreter Lock (GIL)

El GIL es un bloqueo global (global lock) para el intérprete de Python, para que pueda ejecutar solo una instrucción a la vez. Un **lock** o **mutex (mutual exclusion)** en computación paralela es una sincronización primitiva que puede bloquear la ejecución paralela. Con un lock, se asegura que nadie puede tocar la variable mientras se está trabajando en ella.

Python ofrece diversas maneras de sincronización primitivas, somo `threading.Lock` y `threading.Semaphore`. Incluso con el módulo `threading`, solo se está ejecutando una sola instrucción a la vez en Python.

## El uso de múltiples threads

`threading` puede brindar muchos beneficios si se está esperando a recursos externos.

Ventajas de `asyncio` sobre `threading`:
- `asyncio` es generalmente más rápida que `threading` porque no hay sincronización de threads.
- Dado que `asyncio` es normalmente *single-threaded*, no hay que preocuparse de *thread safety*.

## ¿Por qué se necesita el GIL?

El GIL, es actualmente una parte esencial del intérprete de CPython porque se asegura de que el manejo de memoria es siempre consistente. Como el GIL se asegura de que una sola instrucción de Python se puede ejecutar simultáneamente, nunca hay problemas donde múltiples bits de código manipulan memoria la mismo tiempo, o donde memoria está siendo liberada al sistema que actualmente no está disponible.

## Múltiples threads y procesos

El módulo `multiprocessing` ha hecho bastante fácil el trabajar alrededor de las limitaciones del GIL porque cada proceso tiene su propio GIL.

El uso del módulo `multiprocessing` es bastante similar al del módulo `threading` pero tiene muchas características muy útiles que hacen mucho más sentido con múltiples procesos. De forma alternativa, se puede usar con `concurrent.futures.ProcessPoolExecutor`, que tiene una interfaz casi idéntica a `concurrent.futures.ThreadPoolExecutor`

**IMPORTANTE: Debe ser consciente de que es crítico el poner en el código `if __name__ == '__main__'` cuando use `multiprocessing`. Cuando este módulo lanza los procesos extra de Python, va a ejecutar el mismo script de Python, así que sin este bloque de código usted va a terminar en un loop infinito de procesos que inician.**

## Ejemplos básicos

Para crear threads y prcoesos, se tienen diversas opciones:
- `concurrent.futures`: Una interfaz fácil de usar para correr funciones ya sea en threads o procesos, similar a `asyncio`.
- `threading`: Una interfaz para crear threads de forma directa.
- `multiprocessing`: Una interfaz con mucha utilidad y funciones convenientes para crear y manejar múltiples procesos de Python.

## concurrent.futures

In [1]:
import time
import concurrent.futures

def timer(name, steps, interval = 0.1):
    '''funcion timer que duerme steps * interval'''
    for step in range(steps):
        print(name, step)
        time.sleep(interval)
        

if __name__ == '__main__':
    #Reemplazar con concurrent.futures.ProcessPoolExecutor para
    #múltiples procesos en vez de threads
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
        #Entregar la función a executor con algunos argumentos
        executor.submit(timer, steps = 3, name = 'a')
        
        #Dormir un poquito, para mantener el orden del output consistente
        time.sleep(0.1)
        executor.submit(timer, steps = 3, name = "b")

a 0
a 1
b 0
b 1
a 2
b 2


Primero se creó una función `timer` que corre `time.sleep(interval)` y lo hace `steps` veces. Antes de dormir, printea el `nombre` y el `step` actual así podemos ver fácilmente que es lo que está pasando. Luego, creamos executor usando `concurrent.futures.ThreadPoolExecutor` para ejecutar las funciones. Finalmente, entregamos las funciones que queremos ejecutar con sus respectivos argumentos para empezar ambos threads. Entre medio, dormimos por un pequeño intervalo de tiempo, así el output es consistente.

## threading

In [2]:
import time
import threading

def timer(name, steps, interval = 0.1):
    '''funcion timer que duerme steps * interval'''
    for step in range(steps):
        print(name, step)
        time.sleep(interval)
        
# Se crean los threads de forma declarativa
a = threading.Thread(target = timer, kwargs = dict(name = "a", steps = 3))
b = threading.Thread(target = timer, kwargs = dict(name = "b", steps = 3))

#Se empiezan los threads
a.start()

#Se duerme un poquito
time.sleep(0.1)
b.start()

a 0
a 1
b 0
ab 1
 2
b 2


La función `timer` es idéntica. En este caso creamos los threads instanciando `threading.Thread()` directamente, pero heredar de `threading.Thread` es también una opción. Los argumentos a la función objetivo pueden ser dados, pasando args/kwargs argumentos, pero estoy son opcionales si no se tiene necesidad de usarlos o si se han prellenado usando `functools.partial`.

Aquí estamos creando explícitamente los threads para correr una sola funcion y salir tan pronto como su tarea haya terminado. Esto es útil para threads que corren durante largos períodos, dado que este método requiere setear el thread para cada función.

In [4]:
import time
import threading

class Timer(threading.Thread):
    def __init__(self, name, steps, interval = 0.1):
        self.steps = steps
        self.interval = interval
        #threading.Thread tiene un nombre built- in
        #Be careful not to manually override it
        super().__init__(name = name)
        
        
    def run(self):
        '''funcion timer que duerme steps * interval'''
        for step in range(self.steps):
            print(self.name, step)
            time.sleep(self.interval)

a = Timer(name = "a", steps = 3)
b = Timer(name = "b", steps = 3)

a.start()

time.sleep(0.1)
b.start()

a 0
a 1
b 0
b 1
a 2
b 2


Diferencias críticas a tener en consideración:
- `name` es un atributo reservado para `threading.Thread`.
- La función target por defecto es `run()`. Sea cuidadoso de sobreescribir el método `run()` en vez del método `start()`, de lo contrario el código no se va a ejecutar en un thread aparte, pero se ejecutará como una función regular cuando se llama el método `start()`.

## multiprocessing

Ejecute los siguientes códigos no en el notebook

In [6]:
import time
import multiprocessing

def timer(name, steps, interval = 0.1):
    '''funcion timer que duerme steps * interval'''
    for step in range(steps):
        print(name, step)
        time.sleep(interval)
        
        
if __name__ == "__main__":
    #Se crean los procesos de forma declarativa
    a = multiprocessing.Process(target = timer, kwargs = dict(name = "a", steps = 3))
    b = multiprocessing.Process(target = timer, kwargs = dict(name = "b", steps = 3))
    
    #Se comienzan los procesos
    a.start()
    #Se duerme un poquito
    time.sleep(0.1)
    b.start()

A continuación se adjunta la versión basada en OOP

In [None]:
import time
import multiprocessing

class Timer(multiprocessing.Process):
    def __init__(self, name, steps, interval = 0.1):
        self.steps = steps
        self.interval = interval
        
        super().__init__(name = name)
        
    
    def run(self):
        '''funcion timer que duerme steps * interval'''
        for step in range(self.steps):
            print(self.name, step)
            time.sleep(self.interval)
            
if __name__ == "__main__":
    a = Timer(name = "a", steps = 3)
    b = Timer(name = "b", steps = 3)
    
    a.start()
    time.sleep(0.1)
    b.start()

## Salir limpiamente de procesos y threads de ejecución prolongada

El módulo `threading` es más que nada útil para threads que se ejecutan de manera prolongada que manejan un recurso externo. Algunos ejemplos son:

- Cuando se crea un servidor y se quiere estar atento para nuevas conexiones
- Cuando se conecta a HTTP WebSockets y se necesita que la conexión se mantenga abierta
- Cuando se necesitan guardar ciertos cambios de forma periódica

En algún punto, puede que necesite terminar un thread **afuera** desde fuera del thread, durante la salida del script principal, por ejemplo. Esperar un thread que se termina por si solo es trivial; la única cosa que se debe hacer es `future.result()` o `some_thread.join(timeout = ....)` y listo. La parte difícil es decirle al thread que termine su ejecución y correr la limpieza mientras se sigue haciendo otra cosa.

La única solución real para esto, que aplica si tiene suerte, es un simple `while loop` que corre hasta que se le da una señal de stop.

In [None]:
import time
import threading

class Forever(threading.Thread):
    def __init__(self):
        self.stop = threading.Event()
        super().__init__()
        
    def run(self):
        while not self.stop.is_set():
            #Haga lo que sea que necesite
            time.sleep(0.1)
            
thread = Forever()
thread.start()
#Haga lo que sea que necesite
thread.stop.set()
thread.join()

Este código usa `threading.Event()` como una bandera para decirle al thread cuando la salida es necesaria.

El escenario ideal es: tener un loop donde la condición del loop es chequeada de forma regular y el intervalo del loop es el el máximo delay para terminar el thread. ¿Qué pasa si el thread está ocupando haciendo operaciones y no chequea la condición del while?. En este caso, tener el evento de stop es inútil en esos escenarios y se necesitan métodos más poderosos para sacar el thread. Para este escenario, se tienen ciertas opciones:

- Evitar este escenario completamente usando `asyncio` o `multiprocessing`
- Hacer el thread un daemon thread setteando `your_thread.daemon = True` *antes* de empezar el thread. Esto va a aniquilar el thread automáticamente una vez el proceso principal termina
- Aniquilar el thread desde afuera ya sea diciéndole al sistema operativo que envíe una señal de terminar/aniquilar o levantando una excepción dentro del thread desde el thread principal. Esta es una opción que no debe considerar, es recomendable que no la utilice.

Notar que las mismas limitaciones de `threading` también aplican a `multiprocessing`.

A continuación se incluye un ejemplo para ilustrar como podemos terminar forzadamente o aniquilar un thread (con el riesgo de corrupción de memoria):

In [None]:
import time
import multiprocessing

class Forever(multiprocessing.Process):
    def run(self):
        while True:
            #Haga lo que sea que necesite
            
            time.sleep(0.1)
            
if __name__ == "__main__":
    process = Forever()
    process.start()
    
    #Matar nuestro proceso
    process.terminate()
    
    #Esperar 10 segundos para salir correctamente
    process.join(10)
    
    #Si todavía no sale, matarlo
    if process.exitcode is None:
        process.kill()

## Batch processing usando `concurrent.futures`

A menudo, se quiere "hacer girar" varios threads o procesos y esperar hasta que todos terminen. Este es el caso en donde `concurrent.futures` y `multiprocessing` brillan. Estos te permiten llamar `executor.map()` o `pool.map()`. Solo se necesita crear una lista de elementos a procesar, llamar la función `[executor/pool].map()` y listo.

El *downside* de threading es: solo entrega un beneficio si el recurso externo es lo suficientemente lento para garantizar la sobrecarga de sincronización (synchronization overhead). Con un recurso externo rápido, es probable que experimente *slowdowns* dado que el GIL se convierte en el cuello de botella. CPython solo ejecuta una sola instrucción a la vez así que aquello se puede volver de forma rápida algo problemático.

En términos de performance, debería siempre correr benchmarks para ver que le funciona mejor en cada caso, especialmente cuando se habla en la cantidad de threads.

## Batch processing usando multiprocessing

`multiprocessing` ofrece muchas opciones avanzadas que pueden ser muy convenientes e incluso pueden ayudar al rendimiento en algunos escenarios.

En este caso, utilizaremos `multiprocessing.Pool`, que crea un *process pool* muy similar a `concurrent.futures` executors pero ofrece cosas adicionales:
- `map_async(func, iterable, [..., callback, ...])`: Este método es similar al método `map()` en `concurrent.futures` pero en vez de bloquear, retorna una lista de objetos AsyncResult, así puedes buscar los resultados cuando los necesites.

- `imap(func, iterable[, chunksize])`: Este método, es la versión de generator de `map()`. No precarga los items del iterable, por lo que se pueden procesar de forma segura iterables largos si es que se necesita. Esto puede ser mucho más rápido si se necesitan procesar muchos elementos.

- `imap_unordered(func, iterable[, chunksize])`: Este método es efectivamente lo mismo que `imap()` excepto que retorna los resultados tan pronto como son procesados, lo que puede incrementar el rendimiento aún más.

- `starmap(func, iterable[, chunksize])`: Este método es muy similar al método `map()`, pero soporta múltiples argumentos pasándolos como *args.

- `starmap_async(func, iterable, [..., callback, ...])`: Es el método de no-bloqueo de `starmap()`, pero retorna una lista de objetos AsyncResult para que luego se puedan utilizar a su conveniencia.

## Compartiendo data entre procesos y threads

El intercambio de información es la parte más difícil de multiprocessing, multithreading y programación distribuida en general; que información ignorar, que información compartir, y cual saltar. Cuando sea posible, no transfiera datos, no comparta datos y mantenga todo local. Esto es esencialmente, el paradigma de la programación funcional, cosa por la cual este tipo de paradigma se mezcla muy bien con el multiprocessing. La librería `multiprocessing` tiene diversas opciones para compartir información, pero internamente se puede dividir en dos opciones distintas:

- **Memoria compartida**: Esta es por lejos la solución más rápida, pero solo puede ser usada para tipos inmutables y está restringido para seleccionar unos cuantos tipos y objectos personalizados que son creados a través de `multiprocessing.sharedctypes`. Esta es una solución fantástica si solo se necesitan guardar datos primitivos como int, float, bool, str, bytes, y listas de tamaño fijo o diccionarios.

- `multiprocessing.Manager`: La clase `Manager` ofrece una amplia gama de distintas opciones para guardar y sincronizar datos, como locks, semáforos, queues, lists, dicts, entre otros. Si puede ser *pickled*, puede trabajar con manager.

Para threading, la solución es incluso más sencilla: toda la memoria es compartida, así que por defecto, todos los objetos están disponibles para cada thread, pero hay una excepción llamada thread-local variable.

Dado que múltiples y/o procesos pueden escribir en la misma pieza de memoria al mismo tiempo, puede ser una operación riesgosa. En el mejor de los casos, los cambios se pueden perder por conflictos al momento de escribir, en el peor de ellos, se puede corromper la memoria, lo que podría incluso crashear el intérprete.

### Compartiendo memoria entre procesos

Python ofrece distintas estructuras para hacer el intercambio de memoria entre procesos una operación segura:
- `multiprocessing.Value`
- `multiprocessing.Array`
- `multiprocessing.shared_memory.SharedMemory`
- `multiprocessing.shared_memory.ShareableList`

Para compartir valores primitivos, se puede usar `multiprocessing.Value` y `multiprocessing.Array`. Ambos son esencialmente lo mismo, pero con Array se pueden guardar múltiples valores mientras que Value es un solo valor. Para tipos más avanzados, puede chequear `multiprocessing.sharedctypes`

In [2]:
import multiprocessing

some_int = multiprocessing.Value("i", 123)
with some_int.get_lock():
    some_int.value += 10
    
print(some_int.value)

133


In [3]:
some_double_array = multiprocessing.Array("d", [1, 2, 3])
with some_double_array.get_lock():
    some_double_array[0] += 2.5
    
print(some_double_array)

<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_double_Array_3 object at 0x000001D908E187C0>>


El objeto `multiprocessing.shared_memory.SharedMemory` es similar a Array pero es una estructura *low-level*. Ofrece una interfaz para leer/escribir a un *optionally named block of memory* así se puede acceder desde otros procesos, por su nombre. Además, cuando termina de usarlo usted debe llamar `unlink()` para liberar la memoria.

In [5]:
from multiprocessing import shared_memory

#Desde el proceso A podríamos escribir algo
name = "share_a"
share_a = shared_memory.SharedMemory(name, create = True, size = 4)
share_a.buf[0] = 10

#De un proceso distinto, o el mismo, podemos acceder a los datos
share_a = shared_memory.SharedMemory(name)
print(share_a.buf[0])

#Asegúrese de limpiar al final, solo una vez!
share_a.unlink()

10


El parámetro `create = True` le pide al sistema operativo memoria. Solo despues de ello, podemos referenciar el bloque desde otro proceso.

Finalmente, tenemos `multiprocessing.shared_memory.ShareableList`. Si bien este objeto podría ser un tanto más conveniente que `Array` y `SharedMemory` dado que permite flexibilidad con los types, es una interfaz que sigue siendo difícil de usar y no permite el modificar su tamaño. Mientras que usted puede cambiar el tipo de los elementos, no se puede redimensionar el objeto.

In [8]:
import multiprocessing

def triangle_number_local(n):
    total = 0
    for i in range(n + 1):
        total += i
        
    return total

def bench_local(n, count):
    with multiprocessing.Pool() as pool:
        results = pool.imap_unordered(
            triangle_number_local,
            (n for _ in range(count)),
        )
        print('Sum:', sum(results))

In [9]:
import multiprocessing

class Shared:
    pass

def initializer(shared_value):
    Shared.value = shared_value
    
def triangle_number_shared(n):
    for i in range(n + 1):
        with Shared.value.get_lock():
            Shared.value.value += 1
            
def bench_shared(n, count):
    shared_value = multiprocessing.Value('i', 0)

    # We need to explicitly share the shared_value. On Unix you
    # can work around this by forking the process, on Windows it
    # would not work otherwise
    pool = multiprocessing.Pool(
        initializer=initializer,
        initargs=(shared_value,),
    )

    iterable = (n for _ in range(count))
    list(pool.imap_unordered(triangle_number_shared, iterable))
    print('Sum:', shared_value.value)

    pool.close()

In [None]:
import timeit

if __name__ == '__main__':
    n = 1000
    count = 100
    number = 5

    functions = 'bench_local', 'bench_shared', 'bench_manager'
    for function in functions:
        statement = f'{function}(n={n}, count={count})'
        result = timeit.timeit(
            statement, number=number,
            setup=f'from __main__ import {function}',
        )
        print(f'{statement}: {result:.3f}')

## Compartiendo data entre procesos usando managers

Con un `Manager` podemos compartir lo que sea que pueda ser *pickled* en una manera sencilla si estamos dispuestos a sacrificar un poquito de rendimiento. La gran ventaja de este método es que se puede usar entre múltiples dispositivos. Una de las opciones más convenientes para compartir datos con `multiprocessing` es `multiprocessing.Namespace`

## Thread safety

Cuando se trabaja con threads o procesos, debe estar consciente de que puede que no sea el único modificando una variable en un instante determinado. Si esto sucede, puede llegar a causar bugs que son increíblemente difíciles de *debuggear*.

En muchas ocasiones, el GIL lo va a proteger de estos problemas cuando se use `threading`, pero no tome esta protección por garantizada y asegúrese de proteger las variables si múltiples threads puede que actualicen esa variable al mismo tiempo.

In [19]:
import time
import concurrent.futures

counter = 10

def increment(name):
    global counter
    current_value = counter
    print(f"{name} value before increment: {current_value}")
    counter = current_value + 1
    print(f"{name} value after increment: {counter}")
    
print(f"Before thread start: {counter}")

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(increment, range(3))

print(f"After thread finish: {counter}")

Before thread start: 10
0 value before increment: 10
0 value after increment: 11
1 value before increment: 11
1 value after increment: 12
2 value before increment: 12
2 value after increment: 13
After thread finish: 13


Si bien en este caso se obtuvo 13, no se garantiza de que siempre sea este el resultado correcto.

Cuando esté experimentando errores extraños y difíciles de explicar en un sistema usando múltiples threads/procesos, asegúrese de que si también ocurren cuando corre un solo thread. Errores como estos son causados de forma fácil y pueden ser introducidos fácilmente por *third-party code* que no fue hecho thread-safe.

Para hacer el código thread-safe, se tienen algunas opciones:
- Si no se actualizan variables compartidas desde múltiples threads/procesos en paralelo, no hay nada de lo que preocuparse.
- Use operaciones atómicas cuando modifique sus variables. Una operación atómica es una operación que ejecuta en una sola instrucción, haciendo así que no puedan levantarse conflictos.
- Use locks para proteger sus variables.

## Deadlocks

Un deadlock ocurre cuando threads o procesos están manteniendo un lock mientras esperan a otro thread/proceso para liberar un lock. En algunos casos, puede que incluso tenga un thread/lock que se está esperando a sí mismo.

In [1]:
import time
import threading

a = threading.Lock()
b = threading.Lock()

def thread_0():
    print("Thread 0 locking a")
    with a:
        time.sleep(0.1)
        print("Thread 0 locking b")
        with b:
            print("Thread 0 everything locked")
            

def thread_1():
    print("Thread 1 locking b")
    with b:
        time.sleep(0.1)
        print("Thread 1 locking a")
        with a:
            print("Thread 1 everything locked")

            
threading.Thread(target=thread_0).start()
threading.Thread(target=thread_1).start()

Thread 0 locking a
Thread 1 locking b
Thread 1 locking a
Thread 0 locking b


La función `thread_0` lockea primero a y después b y la función `thread_1` hace esto en el orden inverso. Esto es lo que causa el deadlock; ambos van a esperar al otro para terminar. Para asegurarnos de que alcanzamos el deadlock en este ejemplo, ponemos un pequeño sleep para asegurarnos que `thread_0` no termina antes de que `thread_1` empiece.

En general, hay diversas estrategias que se pueden emplear para evitar los deadlocks:
- Los deadlocks solo pueden ocurrir cuando se tienen múltiple locks.
- Intentar mantener la sección de lock pequeña, así hay menos chances de accidentalmente añadir otro lock dentro de ese lock.
- Siempre tener un orden consistente de locking. Si siempre se lockea en el mismo orden, no se obtienen deadlocks.

## Thread-local variables

¿Qué hacer si le queremos dar a cada thread una variable global por separado? Es aquí donde entra en juego `threading.local`, da un contexto específico para el thread actual. Esto puede ser útil para conexiones a bases de datos.

A continuación se ilustra el uso de variables locales para threads con un pequeño ejemplo:

In [2]:
import threading
import concurrent.futures

context = threading.local()

def init_counter():
    context.counter = 10
    

def increment(name):
    current_value = context.counter
    print(f"{name} value before increment: {current_value}")
    context.counter = current_value + 1
    print(f"{name} value after after increment: {context.counter}")
    
init_counter()
print(f"Before thread start: {context.counter}")

with concurrent.futures.ThreadPoolExecutor(initializer = init_counter) as executor:
    executor.map(increment, range(5))
    
print(f"After thread finish: {context.counter}")

Before thread start: 10
0 value before increment: 10
0 value after after increment: 11
1 value before increment: 11
1 value after after increment: 12
2 value before increment: 12
2 value after after increment: 13
3 value before increment: 13
3 value after after increment: 14
4 value before increment: 14
4 value after after increment: 15
After thread finish: 10


Este ejemplo es a grandes rasgos el mismo que el the `thread-safety`, pero en vez de tener una variablo global counter, ahora usamos `threading.local()` como un contexto para setear la variable counter. Dado que una *thread-local variable* existe dentro del thread y no se copia automáticamente a otros thread, todos los threads tienen que setear `counter` de forma separada.

## ¿Procesos, threads, o un solo thread?

La primera y más importante pregunta que uno se debería de hacer es si realmente necesitamos usar `threading` o `multiprocessing`.

Segundo, uno se debería preguntar que factores están limitando el rendimiento. Si la limitación es I/O externo, podría ser útil usar `asyncio` o `threading` para manejar aquello, pero de todos modos no está garantizado.

Asumiendo que el cuello de botella de I/O puede ser aliviado, todavía se tiene la opción de elegir `asyncio` versus `threading`.

Si el GIL es el cuello de botella dado a *heavy calculations* del código de Python, `multiprocessing` podría ayudar muchísimo. Pero incluso en esos casos, esta no es la única opción, para muchos procesos lentos, puede ayudar emplear librerías rápidas como `numpy`.

## threading vs concurrent.futures

Las ventajas de `threading` por sobre `concurrent.futures` son:
- Podemos especificar el nombre del thread explícitamente, lo que puede ser visto en el administrador de tareas de muchos sistemas operativos
- Podemos crear y empezar explícitamente un *long-running thread* para una función, en vez de descansar en la disponibilidad dentro del thread pool

Si el escenario permite elegir, podría ser más útil usar `concurrent.futures` en vez de `threading`:
- Con `concurrent.futures` se puede intercambiar entre threads y procesos usando `concurrent.futures.ProcessPoolExecutor` en vez de `concurrent.futures.ThreadPoolExecutor`.
- Con `concurrent.futures` se tiene el método `map()` para fácilmente *batch-process* una lista de items sin tener el (potencial) agobio de setear y aniquilar el thread.
- Los objetos `concurrent.futures.Future` que son retornados por los métodos de `concurrent.futures` permiten un fino control sobre los resultados y el manejo.

## multiprocessing vs concurrent.futures

Las ventajas de `multiprocessing` sobre `concurrent.futures` son:
- Muchos métodos avanzados de *mapping* como `imap_unordered` y `starmap`
- Más control sobre el pool (ej. `terminate()`, `close()`)
- Puede ser utilizado sobre múltiples máquinas
- Se puede especificar manualmente el método de inicio (fork, spawn o forkserver).
- Se puede elegir el intérprete de Python

Las ventajas de `concurrent.futures` sobre `multiprocessing` son:
- Se puede cambiar fácilmente a `concurrent.futures.ThreadPoolExecutor`
- Los objetos que se retornan de tipo `Future` permiten mayor control sobre el manejo de los resultados cuando se compara con los objetos AsyncResult que `multiprocessing` usa.

## Hyper-threading vs cores de CPU físicos

Hyper-threading es una tecnología que ofrece cores de CPU virtuales extra a los cores físicos. La idea es, dado que estos cores de CPU virtuales tienen caches separados y otros recursos, se puede cambiar de forma más eficiente entre distintas tareas. Cuando realmente se quiere maximizar el uso de la CPU, es generalmente mejor usar solo el contador de los procesadores físicos.

In [None]:
import timeit
import multiprocessing

def busy_wait(n):
    while n > 0:
        n -= 1
        
def benchmark(n, processes, tasks):
    with multiprocessing.Pool(processes = processes) as pool:
        #Ejecutar la funcion busy_Wait "tasks" veces con el parámetro n
        pool.map(busy_wait, [n for _ in range(tasks)])
        
if __name__ == '__main__':
    n = 100000
    tasks = 128
    for exponent in range(6):
        processes = int(2 ** exponent)
        statement = f'benchmark({n}, {processes}, {tasks})'
        result = timeit.timeit(
            statement,
            number=5,
            setup='from __main__ import benchmark',
        )
        print(f'{statement}: {result:.3f}')
    

El problema con hyper-threading cuando se carga de forma pesada instrucciones al computador. Tan pronto como los procesos usan el 100% de un core de CPU, la tarea de interacambiar procesos, reduce el rendimiento.

## Procesos remotos

Usando la librería `multiprocessing`, es bastante sencillo ejecutar trabajos en servidores remotos, pero la documentación actualmente sigue un poco *críptica*. El módulo `multiprocessing.connection` tiene classes de `Client` y `Listener`, que facilitan la comunicación segura entre clientes y servidores en una manera simple.

La comunicación no es lo mismo que el manejo de procesos y queus, sin embargo, estas características requieren un esfuerzo extra. La librería, en ese sentido, no está muy completa.

## Procesos distribuidos usando multiprocessing.

## Procesos distribuidos usando Dask

La liberría de Dask se está convirtiendo rápidamente en el standard para ejecución distribuida en Python. Tiene una integración muy estrecha con muchas librerías científicas como Numpy y Pandas, haciendo la ejecución paralela en muchos casos completamente transparente.

La librería Dask provee una interfaz paralela fácil para ejecutar single-threaded, usar múltiples threads, usar múltiples procesos, e incluso múltiples máquinas. Mientras se mantenga las limitaciones del intercambio de información entre múltiples threads, procesos, y máquinas en mentes, se puede intercambiar fácilmente entre ellos para ver cual tiene el mejor rendimiento en un caso específico.

## Instalando Dask

La librería Dask consiste de múltiples paquetes y quizás no necesite todos ellos. Puede elegir entre
- `pip install dask[extra]`
- `pip install -U "dask[distributed]"`
- `pip install -U "dask[complete]"`