# Friday Technology talk - _Sergei Silnov_ - 03.09.2021

## AsyncIO: making python concurrent code neater

# What options are available for concurrency in Python

![](images/python_ways.svg)

##  What is Async IO?




https://en.wikipedia.org/wiki/Asynchronous_I/O
    
> In computer science, asynchronous I/O (also non-sequential I/O) is a form of input/output processing that permits other processing to continue before the transmission has finished.


## What is needed?

1. Functions that can be paused and continued 
2. Scheduler (event loop)

##  What is AsyncIO?


> `asyncio` is a python package that implements concurrency with coroutines driven by an event loop

# Generator based "Coroutines"
## Back in 2006 - Python 2.5 

https://www.python.org/dev/peps/pep-0342/


In [1]:
def gen(max=5):
    a = 0
    while a < max:
        print("Harvest!")
        yield a
        a += 1

l = gen()
print(l)

print(next(l), next(l))
print(list(l))

<generator object gen at 0x7ff1bcc82cf0>
Harvest!
Harvest!
0 1
Harvest!
Harvest!
Harvest!
[2, 3, 4]


In [2]:
def coro(max=3):
    a = 0
    print("Start something")
    
    while a < max:
        t = yield
        print(f"Consumed! {t}")
        a += 1
        
l = coro()
print(l)
print(next(l))

l.send(1)
l.send("wow")
l.send("pow")

print(list(coro()))

<generator object coro at 0x7ff1b837b0b0>
Start something
None
Consumed! 1
Consumed! wow
Consumed! pow


StopIteration: 

# Python 3.4: Introduction of AsyncIO

https://www.python.org/dev/peps/pep-3156/


In [5]:
# hack for jupyter notebook
import nest_asyncio
nest_asyncio.apply()

In [6]:
import asyncio

@asyncio.coroutine
def counter_coro(n):
    for i in range(n):
        print(i)
        try:
            yield from asyncio.sleep(.1)
            # for i in asyncio.sleep(.1) yield i
        except asyncio.CanceledError:
            break

@asyncio.coroutine
def real_coro():
    yield from asyncio.sleep(1)
    print("Something happened")
    return 24

@asyncio.coroutine
def manager():
    coro = counter_coro(100)
    print(type(coro))
    counter = asyncio.ensure_future(coro)
    result = yield from real_coro()
    counter.cancel()
    return result

loop = asyncio.get_event_loop()
result = loop.run_until_complete(manager())
print("answer:",  result)


  def counter_coro(n):
  def real_coro():
  def manager():


<class 'generator'>
0
1
2
3
4
5
6
7
8
9
Something happened
answer: 24


# Python 3.5 ➡ 3.6 ➡ 3.7 ➡

- Curio https://curio.readthedocs.io/en/latest/
- async/await


In [7]:
import asyncio

async def counter_coro(n):
    for i in range(n):
        print(i)
        await asyncio.sleep(.1)

async def real_coro():
    await asyncio.sleep(1)
    print("Something happened")
    return 24

async def manager():
    coro = counter_coro(100)
    print(type(coro)) # Awaitables: coroutines, Tasks, and Futures
    counter = asyncio.create_task(coro)
    result = await real_coro()
    counter.cancel()
    return result

result = asyncio.run(manager())
print("answer:",  result)


<class 'coroutine'>
0
1
2
3
4
5
6
7
8
9
Something happened
answer: 24


# Dealing with multiple tasks

## High level API

- Coroutines, Tasks, **gather**, shield & wait_for
- Synchronization Primitives - Lock, **Event**
- Streams - networking without callbacks
- Subprocesses - create_subprocess_exec, create_subprocess_shell
- Queues

## Low level API 

- Event loop
- Futures
- Policies, Transports, Protocols ...

In [10]:
import random

async def counter_coro(begin, end, event, boom = 42):
    for i in range(begin, end):
        print(i)
        if  random.randint(1, 20) == 8 :
            event.set()
        if i == boom:
            raise Exception("The answer!")
        await asyncio.sleep(.1)
    return "The end"

async def waiter(event):
    await event.wait()
    raise Exception("Ooops!")

async def main():
    try:
        event = asyncio.Event()
        bunch = asyncio.gather(counter_coro(100, 120, event), counter_coro(35, 80, event), waiter(event),
                               # return_exceptions=True,
                              )
        print(type(bunch)) 
        print(await bunch)
    except Exception as e:
        print(e)

asyncio.run(main())

<class 'asyncio.tasks._GatheringFuture'>
100
35
101
36
102
37
103
38
Ooops!
104
39
105
40
106
41
107
42
108
109
110
111
112
113
114
115
116
117
118
119


# Are there better options? Trio

https://trio.readthedocs.io/en/stable/tutorial.html#an-echo-client

In [9]:
import trio

async def counter_coro(begin, end, boom = 42):
    for i in range(begin, end):
        print(i)
        if i == boom:
            raise Exception("The answer!")
        await trio.sleep(.1)
    return "The end"

async def boom():
    1/0

async def parent():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(counter_coro, 42, 120)
        nursery.start_soon(counter_coro, 35, 80)
        nursery.start_soon(boom)

trio.run(parent)

42
35


Traceback (most recent call last):
  File "/home/ku/.local/share/virtualenvs/python_asyncio-g6L0igIP/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3441, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_113892/817818457.py", line 20, in <module>
    trio.run(parent)
  File "/home/ku/.local/share/virtualenvs/python_asyncio-g6L0igIP/lib/python3.9/site-packages/trio/_core/_run.py", line 1932, in run
    raise runner.main_task_outcome.error
  File "/tmp/ipykernel_113892/817818457.py", line 18, in parent
    nursery.start_soon(boom)
  File "/home/ku/.local/share/virtualenvs/python_asyncio-g6L0igIP/lib/python3.9/site-packages/trio/_core/_run.py", line 815, in __aexit__
    raise combined_error_from_nursery
trio.MultiError: Exception('The answer!'), ZeroDivisionError('division by zero')

Details of embedded exception 1:

  Traceback (most recent call last):
    File "/tmp/ipykernel_113892/817818457.py", line 7, in counter_coro
     

# But why all the interfaces are so different?

https://github.com/alex-sherman/unsync

In [None]:
from unsync import unsync
import time

@unsync
def non_async_function(seconds):
    time.sleep(seconds)
    return 1

@unsync
async def async_function(seconds):
    await asyncio.sleep(seconds)
    return 5

@unsync(cpu_bound=True)
def slow_function():
    a = 10**101
    b = 10**100
    return a/b

start = time.time()
print(slow_function().result()  + async_function(1).result()  + non_async_function(0.5).result())
print('Executed in {} seconds'.format(time.time() - start))

108
109
110
111
112
113
114
115
116
117
118
119


# await questions(folks)



# Thanks and stay tuned!

You can find these slides at:

https://github.com/kumekay/talks/blob/main/python_asyncio/asyncio.ipynb


### Next talk: Practical micropython with uasyncio



Links:
    
- [Fluent Python by Luciano Ramalho](https://www.oreilly.com/library/view/fluent-python/9781491946237/)
- [Async IO in Python: A Complete Walkthrough](https://realpython.com/async-io-python/)
- [Miguel Grinberg Asynchronous Python for the Complete Beginner PyCon 2017](https://www.youtube.com/watch?v=iG6fr81xHKA)
- [Demystifying Python's Async and Await Keywords](https://www.youtube.com/watch?v=F19R_M4Nay4)
- [asyncio: what's next | Yury Selivanov @ PyBay2018](https://www.youtube.com/watch?v=vem5GHboRNM)