# Multitasking using `async / await` & `asyncio`

http://bit.ly/pycon19-asyncio

by Marte Soliza (marte@insynchq.com)

## Multitasking

* Multiple sequences of instructions (tasks) are executing concurrently.

### Process
* Task in an operating system with their own independent resources.

### Thread
* Task within a process with shared resources.


### Preemptive multitasking
_Using native threads_

In [None]:
import threading
import time


class Counter:
    
    def __init__(self):
        self.value = 0
        
    def increment(self):
        self.value += 1
    
    def decrement(self):
        self.value -= 1

        
def incrementer(counter, n, m):
    for _ in range(n):
        for _ in range(m):
            counter.increment()
        time.sleep(0.001)

def decrementer(counter, n, m):
    for _ in range(n):
        for _ in range(m):
            counter.decrement()
        time.sleep(0.001)

counter = Counter()
threads = []
for _ in range(4):
    threads.append(threading.Thread(target=incrementer, args=(counter, 30, 40000)))
    threads.append(threading.Thread(target=decrementer, args=(counter, 30, 40000)))
    
for thread in threads:
    thread.start()
    
for thread in threads:
    thread.join()
    
print(counter.value)

Race condition! 🏎 Difficult to debug 🐞 (a _Heisenbug_).

You can try tweaking the values above and see that lower values of `m` does not lead to race conditions.

In [None]:
counter = Counter()
threads = []
for _ in range(4):
    threads.append(threading.Thread(target=incrementer, args=(counter, 30, 4000)))
    threads.append(threading.Thread(target=decrementer, args=(counter, 30, 4000)))
    
for thread in threads:
    thread.start()
    
for thread in threads:
    thread.join()
    
print(counter.value)

In [None]:
import dis
dis.dis(Counter.increment)


| Time | Running | Thread 1 (inc) | Thread 2 (dec) |
| ---- | ------- | -------------- | -------------- |
| 0    | 2       | 1000           | 1000           |
| 1    | 1       | 1002           | 1000           |
| 2    | 1       | 1002           | 1000           |
| 3    | 2       | 1002           | 999            |


### Cooperative multitasking
_Using asyncio & coroutines_

In [None]:
import asyncio


class Counter:
    
    def __init__(self):
        self.value = 0
        
    def increment(self):
        self.value += 1
    
    def decrement(self):
        self.value -= 1


async def incrementer(counter, n, m):
    for _ in range(n):
        for _ in range(m):
            counter.increment()
        await asyncio.sleep(0.001)

async def decrementer(counter, n, m):
    for _ in range(n):
        for _ in range(m):
            counter.decrement()
        await asyncio.sleep(0.001)

counter = Counter()
tasks = []
for _ in range(4):
    tasks.append(asyncio.create_task(incrementer(counter, 30, 40000)))
    tasks.append(asyncio.create_task(decrementer(counter, 30, 40000)))

# IMPORTANT: You can only do this on the top level in Jupyter Notebook (or IPython).
await asyncio.wait(tasks)
# Normally, you have to do something like this:
# asyncio.run(asyncio.wait(tasks))

print(counter.value)

## Usage

In [None]:
import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(3.0)
    print("World!")
    
await hello()
# IMPORTANT: You can only do the above in Jupyter Notebook (or IPython). Normally, you'll have to run it like this:
#asyncio.run(hello())

### `async`
* Coroutine & couroutine functions

In [None]:
async def coro_fn(): # couroutine function
    print("Hello")
    
coro = coro_fn() # coroutine

print(type(coro_fn))
print(type(coro))

Couroutines are pretty much like generators.

In [None]:
coro.send(None)

### `await`
* Awaitables

In [None]:
async def count():
    for i in range(10):
        print(i)
        await asyncio.sleep(1)

await count()

### Task

In [None]:
import asyncio

async def count():
    for i in range(10):
        print(i)
        await asyncio.sleep(1)

task = asyncio.create_task(count())
asyncio.get_running_loop().call_later(3, task.cancel)
await task

### Future

In [None]:
import asyncio

fut = asyncio.get_running_loop().create_future()

async def hello(fut):
    print("Hello")
    fut.set_result("World")

async def world(fut):
    print(await fut)

await asyncio.gather(world(fut), hello(fut))

### Event Loop
* What actually runs.

In [None]:
import asyncio
asyncio.get_running_loop()

## Advantages

### Explicit is better than implicit
* You know the places where a coroutine can possibly be suspended.
* Transitivity of coroutines.

In [None]:
async def fn1():
    print("Hello")
    await asyncio.sleep(0.5)
    print("World")

async def fn2():
    print("Begin")
    await fn1()
    print("End")
    
await fn2()

### Lightweight

In [None]:
import time
import asyncio

async def idle():
    await asyncio.sleep(5)

coros = [idle() for _ in range(10**5)]

t = time.time()    
await asyncio.wait(coros)
print(time.time() - t)

In [None]:
import threading
import time

def idle():
    time.sleep(5)
    
threads = [threading.Thread(target=idle) for _ in range(10**5)]

t = time.time()
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
print(time.time() - t)

### Better control

#### Cancel

In [None]:
import asyncio

async def count():
    for i in range(10):
        print(i)
        await asyncio.sleep(1)

task = asyncio.create_task(count())
asyncio.get_running_loop().call_later(3, task.cancel)
await task

#### Timeout

In [None]:
import asyncio

async def count():
    for i in range(10):
        print(i)
        await asyncio.sleep(1)

task = asyncio.create_task(count())

await asyncio.wait_for(task, 3)

### Problem: Restaurant Simulator

In [None]:
from collections import namedtuple
from concurrent.futures import Future
import random
import threading
import queue
import enum
import time


class Event(enum.Enum):
    ENTER = enum.auto()
    ORDER = enum.auto()
    READY = enum.auto()
    LEAVE = enum.auto()
    
Dish = namedtuple("Dish", "name cooking_time")

class Restaurant:
    def __init__(self, menu, num_tables):
        self.menu = menu
        
        self._tables = [False] * num_tables
        self._tables_lock = threading.Lock()        
        self._waiting_customers = []
        self._waiting_customers_lock = threading.Lock()
        
        self._orders = [None] * num_tables
        self._ordered_dishes = []
        self._ordered_dishes_cond = threading.Condition()
        self._ready_dishes = []
        self._ready_dishes_lock = threading.Lock()
        
        self._events_queue = queue.Queue()
        
    def enter(self):
        fut = Future()
        self._events_queue.put((Event.ENTER, fut))
        return fut.result()
    
    def order_food(self, table_id, dishes):
        dish_events = []
        orders = []
        for dish in dishes:
            dish_event = threading.Event()
            orders.append((dish, dish_event))
        self._orders[table_id] = orders
        # Call waiter and place the orders.
        self._events_queue.put((Event.ORDER, table_id))
        
    def wait_for_food(self, table_id):
        # Wait for ALL the dishes to be delivered.
        orders = self._orders[table_id]
        for dish, dish_event in orders:
            dish_event.wait()
    
    def leave(self, table_id):
        # Leave the table.
        self._tables[table_id] = False
        self._events_queue.put((Event.LEAVE,))

    def waiter(self, waiter_id):
        while True:
            event = self._events_queue.get()
            if event[0] == Event.ENTER:
                # Attempt to seat a new customer.
                with self._tables_lock:
                    for table_id, occupied in enumerate(self._tables):
                        if not occupied:
                            # Table found!
                            self._tables[table_id] = True
                            event[1].set_result(table_id)
                            break
                    else:
                        # No available table, make them wait.
                        self._waiting_customers.append(event[1])
                time.sleep(1)
            elif event[0] == Event.ORDER:
                # Take orders.
                table_id = event[1]
                # Queue the orders for the cooks.
                with self._ordered_dishes_cond:
                    for dish, dish_event in self._orders[table_id]:
                        self._ordered_dishes.append((table_id, dish))
                    self._ordered_dishes_cond.notify_all()
                time.sleep(2)
            elif event[0] == Event.READY:
                # Deliver food.
                table_id, dish = self._ready_dishes.pop(0)
                for dish, dish_event in self._orders[table_id]:
                    dish_event.set()
                time.sleep(3)
            elif event[0] == Event.LEAVE:
                with self._waiting_customers_lock:
                    if not self._waiting_customers:
                        continue
                    # Seat a waiting customer.
                    with self._tables_lock:
                        for table_id, occupied in enumerate(self._tables):
                            if not occupied:
                                self._tables[table_id] = True
                                self._waiting_customers.pop(0).set_result(table_id)
                                break
                time.sleep(1)


    def cook(self, cook_id):
        while True:
            with self._ordered_dishes_cond:
                # Wait for orders.
                self._ordered_dishes_cond.wait_for(lambda: len(self._ordered_dishes) > 0)
                # There can be different strategies for cooking. This one is just FIFO (first in, first out).
                table_id, dish = self._ordered_dishes.pop(0)
            time.sleep(dish.cooking_time)
            self._ready_dishes.append((table_id, dish))
            self._events_queue.put((Event.READY,))

def customer(customer_id, restaurant):
    print(f"Customer {customer_id} enters")
    t = time.time()
    table_id = restaurant.enter()
    print(f"Customer {customer_id} is seated at Table {table_id} (waited for {time.time() - t:.0f}min)")
    
    num_dishes = random.randint(1, 3)
    dishes = random.sample(restaurant.menu, num_dishes)
    dish_names = ", ".join(dish.name for dish in dishes)
    print(f"Customer {customer_id} orders {dish_names}")
    restaurant.order_food(table_id, dishes)
    
    t = time.time()
    restaurant.wait_for_food(table_id)
    
    print(f"Customer {customer_id} is ready to eat (waited for {time.time() - t:.0f}min)")
    # Eat.
    time.sleep(num_dishes * 10)
    
    restaurant.leave(table_id)
    print(f"Customer {customer_id} leaves")


def simulate(restaurant, num_waiters, num_cooks, customer_rate):
    for waiter_id in range(num_waiters):
        threading.Thread(target=restaurant.waiter, args=(waiter_id,)).start()
    for cook_id in range(num_cooks):
        threading.Thread(target=restaurant.cook, args=(cook_id,)).start()
    num_customers = 0
    while True:
        num_customers += 1
        threading.Thread(target=customer, args=(num_customers, restaurant)).start()
        time.sleep(1.0 / customer_rate)


In [None]:
random.seed(0)
restaurant = Restaurant([Dish("Sinigang", 15), Dish("Adobo", 10), Dish("Afritada", 12), Dish("Chopsuey", 8)], 8)
simulate(restaurant, 3, 2, 0.5)

## Gotchas

### Uncooperative tasks (hoarding its share of the event loop)

In [None]:
import asyncio
import hashlib

async def count(n):
    for i in range(n):
        print(i)
        await asyncio.sleep(0.5)

async def calculate(n):
    h = hashlib.sha256()
    for i in range(n):
        h = hashlib.sha256(h.digest())
    print(f"Done calculating {n}")
    return h.hexdigest()

await asyncio.gather(count(20), calculate(5000000))

#### Solution 1: yield every now and then
* Abstraction leak -- you have to tweak

In [None]:
async def calculate1(n):
    h = hashlib.sha256()
    for i in range(n):
        h = hashlib.sha256(h.digest())
        if i % 10000 == 0:
            await asyncio.sleep(0)
    print(f"Done calculating {n}")
    return h.hexdigest()

await asyncio.gather(count(20), calculate1(5000000))

#### Solution 2: Run in a separate thread
* Be careful though! 😱

In [None]:
async def calculate2(n):
    def compute():
        h = hashlib.sha256()
        for i in range(n):
            h = hashlib.sha256(h.digest())
        return h.hexdigest()

    result = await asyncio.get_running_loop().run_in_executor(None, compute)
    print(f"Done calculating {n}")
    return result

await asyncio.gather(count(20), calculate2(5000000))

### Not awaiting a coroutine

In [None]:
count(10)
print("Hello world!")

### Swallowed exceptions

In [None]:
async def task():
    await asyncio.sleep(1)
    do_something()
    
class Runner:
    
    async def run(self):
        self.task = asyncio.create_task(task())

runner = Runner()
await runner.run()

In [None]:
runner = None

#### Enable debug mode to get more info

In [None]:
asyncio.get_running_loop().set_debug(True)

In [None]:
await asyncio.gather(count(20), calculate(5000000))

In [None]:
count(10)
print("Hello world!")

In [None]:
runner = Runner()
await runner.run()
del runner