Los ejemplos están basados en esta página

https://medium.com/analytics-vidhya/asyncio-threading-and-multiprocessing-in-python-4f5ff6ca75e8

### Envio de mensajes

Ejemplo de ejecución de dos tareas de forma secuencial.

In [5]:
#Utilidad Python para tener timestamp de forma automática
import logging
import time

#configuración del logger

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")


num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

# Tarea básica: envio de mensaje

def delay_message(delay, message):
    logging.info(f"{message} received")
    time.sleep(delay)
    logging.info(f"Printing {message}")
    return message

def main():
    logging.info("Main started")
    delay_message(2, num_word_mapping[2])
    delay_message(3, num_word_mapping[3])
    logging.info("Main Ended")

main()

17:52:04:MainThread:Main started
17:52:04:MainThread:TWO received
17:52:06:MainThread:Printing TWO
17:52:06:MainThread:THREE received
17:52:09:MainThread:Printing THREE
17:52:09:MainThread:Main Ended


### Concurrencia con hilos (threads)

Ejecución en "paralelo", con espacio compartido

In [6]:
import threading

def main():
    logging.info("Main started")
    threads = [threading.Thread(target=delay_message, args=(delay, message)) for delay, message in zip([2, 3, 4, 5, 6], 
                                                                            [num_word_mapping[2], num_word_mapping[3], num_word_mapping[4], num_word_mapping[5], num_word_mapping[6]])]
    
    # Se ejecutan los hilos
    
    for thread in threads:
        thread.start()
        
    # Se espera que acaben los hilos (sincronización)
    for thread in threads:
        thread.join() # waits for thread to complete its task
    logging.info("Main Ended")
main()

17:52:12:MainThread:Main started
17:52:12:Thread-14 (delay_message):TWO received
17:52:12:Thread-15 (delay_message):THREE received
17:52:12:Thread-16 (delay_message):FOUR received
17:52:12:Thread-17 (delay_message):FIVE received
17:52:12:Thread-18 (delay_message):SIX received
17:52:14:Thread-14 (delay_message):Printing TWO
17:52:15:Thread-15 (delay_message):Printing THREE
17:52:16:Thread-16 (delay_message):Printing FOUR
17:52:17:Thread-17 (delay_message):Printing FIVE
17:52:18:Thread-18 (delay_message):Printing SIX
17:52:18:MainThread:Main Ended


In [7]:
import threading

def main():
    logging.info("Main started")
    threads = [threading.Thread(target=delay_message, args=(delay, message)) for delay, message in zip([2, 3, 4, 5, 6], 
                                                                            [num_word_mapping[2], num_word_mapping[3], num_word_mapping[4], num_word_mapping[5], num_word_mapping[6]])]
    
    # Se ejecutan los hilos
    
    for thread in threads:
        thread.start()
        
    logging.info("Main Ended")
main()

17:52:23:MainThread:Main started
17:52:23:Thread-19 (delay_message):TWO received
17:52:23:Thread-20 (delay_message):THREE received
17:52:23:Thread-21 (delay_message):FOUR received
17:52:23:Thread-22 (delay_message):FIVE received
17:52:23:Thread-23 (delay_message):SIX received
17:52:23:MainThread:Main Ended
17:52:25:Thread-19 (delay_message):Printing TWO
17:52:26:Thread-20 (delay_message):Printing THREE
17:52:27:Thread-21 (delay_message):Printing FOUR
17:52:28:Thread-22 (delay_message):Printing FIVE
17:52:29:Thread-23 (delay_message):Printing SIX


### Reutilización de hilos

Utilización de uns estructura para los hilos.

In [8]:
import concurrent.futures as cf


with cf.ThreadPoolExecutor(max_workers=2) as executor:
    future_to_mapping = {executor.submit(delay_message, i, num_word_mapping[i]): num_word_mapping[i] for i in range(2, 7)}
    for future in cf.as_completed(future_to_mapping):
            logging.info(f"{future.result()} Done")

17:52:31:ThreadPoolExecutor-1_0:TWO received
17:52:31:ThreadPoolExecutor-1_1:THREE received
17:52:33:ThreadPoolExecutor-1_0:Printing TWO
17:52:33:ThreadPoolExecutor-1_0:FOUR received
17:52:33:MainThread:TWO Done
17:52:34:ThreadPoolExecutor-1_1:Printing THREE
17:52:34:ThreadPoolExecutor-1_1:FIVE received
17:52:34:MainThread:THREE Done
17:52:37:ThreadPoolExecutor-1_0:Printing FOUR
17:52:37:ThreadPoolExecutor-1_0:SIX received
17:52:37:MainThread:FOUR Done
17:52:39:ThreadPoolExecutor-1_1:Printing FIVE
17:52:39:MainThread:FIVE Done
17:52:43:ThreadPoolExecutor-1_0:Printing SIX
17:52:43:MainThread:SIX Done


### Concurrencia con la libreria AsyncIO

* Coroutine: A diferencia de una función convencional con un único punto de salida, una coroutine puede pausar y reanudar su ejecución. La creación de una coroutine es tan simple como utilizar la palabra clave async antes de declarar una función.

* Bucle de eventos o coordinador: Coroutine que gestiona otras coroutines. Puedes pensar en ella como un planificador o maestro.

* Coroutine, Tasks, y Future son objetos awaitable. Una coroutine puede esperar en objetos awaitable. Mientras una coroutine está esperando en un objeto awaitable, su ejecución se suspende temporalmente y se reanuda después de que Future haya terminado.

In [9]:
import asyncio

async def delay_message(delay, message):
    logging.info(f"{message} received")
    await asyncio.sleep(delay) # time.sleep is blocking call. Hence, it cannot be awaited and we have to use asyncio.sleep
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info(f'Current registered tasks: {len(asyncio.all_tasks())}')
    logging.info("Creating tasks")
    task_1 = asyncio.create_task(delay_message(2, num_word_mapping[2])) 
    task_2 = asyncio.create_task(delay_message(3, num_word_mapping[3]))
    logging.info(f'Current registered tasks: {len(asyncio.all_tasks())}')
    await task_1 # suspends execution of coroutine and gives control back to event loop while awaiting task completion.
    await task_2
    logging.info("Main Ended")

await main()

17:52:43:MainThread:Main started
17:52:43:MainThread:Current registered tasks: 2
17:52:43:MainThread:Creating tasks
17:52:43:MainThread:Current registered tasks: 4
17:52:43:MainThread:TWO received
17:52:43:MainThread:THREE received
17:52:45:MainThread:Printing TWO
17:52:46:MainThread:Printing THREE
17:52:46:MainThread:Main Ended


### Otra opción para crear tareas AsyncIO

Usando asyncio.gather para crear varias tareas de una vez

In [10]:
async def main():
    logging.info("Main started")
    logging.info("Creating multiple tasks with asyncio.gather")
    await asyncio.gather(*[delay_message(i+1, num_word_mapping[i+1]) for i in range(5)]) # awaits completion of all tasks
    logging.info("Main Ended")


await main()

17:52:52:MainThread:Main started
17:52:52:MainThread:Creating multiple tasks with asyncio.gather
17:52:52:MainThread:ONE received
17:52:52:MainThread:TWO received
17:52:52:MainThread:THREE received
17:52:52:MainThread:FOUR received
17:52:52:MainThread:FIVE received
17:52:53:MainThread:Printing ONE
17:52:54:MainThread:Printing TWO
17:52:55:MainThread:Printing THREE
17:52:56:MainThread:Printing FOUR
17:52:57:MainThread:Printing FIVE
17:52:57:MainThread:Main Ended


### Cuidado con los bloques entre tareas ()

Una tarea puede tener derecho exclusivo de la CPU, hasta que lo deje. Este hay que manejarlo de forma cuidadosa. Este es el caso de tareas Asyncio.

In [11]:
async def delay_message(delay, message):
    logging.info(f"{message} received")
    if message != 'THREE':
        await asyncio.sleep(delay) # non-blocking call. gives up execution
    else:
        time.sleep(delay) # blocking call
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info("Creating multiple tasks with asyncio.gather")
    await asyncio.gather(*[delay_message(i+1, num_word_mapping[i+1]) for i in range(5)]) # awaits completion of all tasks
    logging.info("Main Ended")


await main() # creats an envent loop

17:53:09:MainThread:Main started
17:53:09:MainThread:Creating multiple tasks with asyncio.gather
17:53:09:MainThread:ONE received
17:53:09:MainThread:TWO received
17:53:09:MainThread:THREE received
17:53:12:MainThread:Printing THREE
17:53:12:MainThread:FOUR received
17:53:12:MainThread:FIVE received
17:53:12:MainThread:Printing ONE
17:53:12:MainThread:Printing TWO
17:53:16:MainThread:Printing FOUR
17:53:17:MainThread:Printing FIVE
17:53:17:MainThread:Main Ended


### Race Conditions

Un código multihilo puede verse afectado por las condiciones de carrera. Esto es especialmente complicado cuando se usan librerías externas, ya que necesitamos verificar si soportan código multihilo. Por ejemplo, el objeto session del popular módulo requests no es thread-safe.


In [8]:
import concurrent.futures as cf

class DbUpdate:
    def __init__(self):
        self.value = 0

    def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        time.sleep(2) # thread gets switched
        logging.info("Reading Value From Db")
        tmp = self.value**2 + 1
        logging.info("Updating Value")
        self.value = tmp
        logging.info("Update Finished")
        
db = DbUpdate()

with cf.ThreadPoolExecutor(max_workers=5) as executor:
    updates = [executor.submit(db.update) for _ in range(10)]

logging.info(f"Final value is {db.value}")

17:34:00:ThreadPoolExecutor-2_0:Update Started
17:34:00:ThreadPoolExecutor-2_1:Update Started
17:34:00:ThreadPoolExecutor-2_2:Update Started
17:34:00:ThreadPoolExecutor-2_3:Update Started
17:34:00:ThreadPoolExecutor-2_4:Update Started
17:34:00:ThreadPoolExecutor-2_0:Sleeping
17:34:00:ThreadPoolExecutor-2_1:Sleeping
17:34:00:ThreadPoolExecutor-2_2:Sleeping
17:34:00:ThreadPoolExecutor-2_3:Sleeping
17:34:00:ThreadPoolExecutor-2_4:Sleeping
17:34:02:ThreadPoolExecutor-2_0:Reading Value From Db
17:34:02:ThreadPoolExecutor-2_1:Reading Value From Db
17:34:02:ThreadPoolExecutor-2_3:Reading Value From Db
17:34:02:ThreadPoolExecutor-2_4:Reading Value From Db
17:34:02:ThreadPoolExecutor-2_2:Reading Value From Db
17:34:02:ThreadPoolExecutor-2_0:Updating Value
17:34:02:ThreadPoolExecutor-2_1:Updating Value
17:34:02:ThreadPoolExecutor-2_3:Updating Value
17:34:02:ThreadPoolExecutor-2_4:Updating Value
17:34:02:ThreadPoolExecutor-2_2:Updating Value
17:34:02:ThreadPoolExecutor-2_0:Update Finished
17:34:0

In [9]:
LOCK = threading.Lock()

class DbUpdate:
    def __init__(self):
        self.value = 0

    def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        time.sleep(2) # thread gets switched
        with LOCK:
            logging.info("Reading Value From Db")
            tmp = self.value**2 + 1
            logging.info("Updating Value")
            self.value = tmp
            logging.info("Update Finished")
        
db = DbUpdate()

with cf.ThreadPoolExecutor(max_workers=5) as executor:
    updates = [executor.submit(db.update) for _ in range(2)]
logging.info(f"Final value is {db.value}")

17:34:59:ThreadPoolExecutor-3_0:Update Started
17:34:59:ThreadPoolExecutor-3_1:Update Started
17:34:59:ThreadPoolExecutor-3_0:Sleeping
17:34:59:ThreadPoolExecutor-3_1:Sleeping
17:35:01:ThreadPoolExecutor-3_0:Reading Value From Db
17:35:01:ThreadPoolExecutor-3_0:Updating Value
17:35:01:ThreadPoolExecutor-3_0:Update Finished
17:35:01:ThreadPoolExecutor-3_1:Reading Value From Db
17:35:01:ThreadPoolExecutor-3_1:Updating Value
17:35:01:ThreadPoolExecutor-3_1:Update Finished
17:35:01:MainThread:Final value is 2


### Versión con AsyncIO

Dado que la tarea tiene el control total sobre cuándo suspender la ejecución, las condiciones de carrera son raras con asyncio.

In [12]:
class DbUpdate:
    def __init__(self):
        self.value = 0

    async def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        await asyncio.sleep(1)
        logging.info("Reading Value From Db")
        tmp = self.value**2 + 1
        logging.info("Updating Value")
        self.value = tmp
        logging.info("Update Finished")
        
async def main():
    db = DbUpdate()
    await asyncio.gather(*[db.update() for _ in range(10)])
    logging.info(f"Final value is {db.value}")
    
await main()

17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:02:MainThread:Update Started
17:55:02:MainThread:Sleeping
17:55:03:MainThread:Reading Value From Db
17:55:03:MainThread:Updating Value
17:55:03:MainThread:Update Finished
17:55:03:MainThread:Reading Value From Db
17:55:03:MainThread:Updating Value
17:55:03:MainThread:Update Finished
17:55:03:MainThread:Reading Value From Db
17:55:03:MainThread:Updating Value
17:55:03:MainThread:Update Finished
17:55:03:MainThread:R

Como se puede ver, una vez que la tarea se reanudó después de dormir, no cedió el control hasta que completó la ejecución de la coroutine. Con threading, el intercambio de hilos no es muy obvio, pero con asyncio, podemos controlar cuándo exactamente la ejecución de la coroutine debe ser suspendida. Sin embargo, puede ir mal cuando dos coroutines entran en un punto muerto.

In [13]:
import asyncio 

async def foo():
    await boo()
    
async def boo():
    await foo()
    
async def main():
    await asyncio.gather(*[foo(), boo()])
    
await main()

RecursionError: maximum recursion depth exceeded

### Multiprocessing

Como se ha mencionado anteriormente, el multiprocesamiento es muy útil cuando se implementan programas intensivos de CPU. El siguiente código ejecuta la ordenación por fusión en 1000 listas con 30000 elementos. Ten paciencia conmigo si la implementación de la ordenación por fusión es un poco torpe.

#### Synchronous version

In [17]:
import math
import numpy as np

r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)]

def merge(l_1, l_2):
    out = []
    key_1 = 0
    key_2 = 0
    for i in range(len(l_1) + len(l_2)):
        if l_1[key_1] < l_2[key_2]:
            out.append(l_1[key_1])
            key_1 += 1
            if key_1 == len(l_1):
                out = out + l_2[key_2:]
                break
        else:
            out.append(l_2[key_2])
            key_2 += 1
            if key_2 == len(l_2):
                out = out + l_1[key_1:]
                break
    return out

def merge_sort(l):
    if len(l) == 1:
        return l
    mid_point = math.floor((len(l) + 1) / 2)
    l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:])
    out = merge(l_1, l_2)
    del l_1, l_2
    return out


logging.info("Starting Sorting")
for r_list in r_lists:
        _ = merge_sort(r_list)
logging.info("Sorting Completed")

18:01:55:MainThread:Starting Sorting
18:02:31:MainThread:Sorting Completed


#### Asynchronous version

In [None]:
import concurrent.futures as cf
import logging
import math
import numpy as np
import time
import threading

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)]

def merge(l_1, l_2):
    out = []
    key_1 = 0
    key_2 = 0
    for i in range(len(l_1) + len(l_2)):
        if l_1[key_1] < l_2[key_2]:
            out.append(l_1[key_1])
            key_1 += 1
            if key_1 == len(l_1):
                out = out + l_2[key_2:]
                break
        else:
            out.append(l_2[key_2])
            key_2 += 1
            if key_2 == len(l_2):
                out = out + l_1[key_1:]
                break
    return out

def merge_sort(l):
    if len(l) == 1:
        return l
    mid_point = math.floor((len(l) + 1) / 2)
    l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:])
    out = merge(l_1, l_2)
    del l_1, l_2
    return out

if __name__ == '__main__':
    logging.info("Starting Sorting")
    with cf.ProcessPoolExecutor() as executor:
        sorted_lists_futures = [executor.submit(merge_sort, r_list) for r_list in r_lists]
    logging.info("Sorting Completed")