## async/await in Python 3.5 and why it is awesome
Yury Selivanov \
https://www.youtube.com/watch?v=m28fiN9y_r8

- asny/await in Python 3.5


### Coroutines: `yield`
- have to use a function to return
- such function raise an exception internally
- can't use return statement

```Python
@inlineCallbacks
def thingummy():
    thing = yield makeSomeRequestResultingInDeferred()
    if tihng == ' I love Twisted':
        # will become the result of the Deferred
        returnValue('TWISTED IS GREAT')
    else:
        # will trigger an errback
        raise Exception('DESTROY ALL LIFE')
```

### Coroutines: `yield from` from 3.3
- cons: hard to understand how it works
- have to use `@asyncio.coroutine`

```Python
@asyncio.coroutine
def computer(x, y):
    print("Compute {} + {} ...".format(x, y))
    yield from asyncio.sleep(1.0)
    return x + y
```

### Coroutines: `async/await` from 3.5
- no decorator
- always a coroutine: if function doesn't have await inside

```Python
async def computer(x, y):
    print("Compute {} + {} ...".format(x, y))
    await asyncio.sleep(1.0)
    return x + y
```

### Why `async/await` is THE answer
- Dedicated syntax; concise and readable
- New buitin type for coroutines
- New concepts: `async for` and `async with`
- Generac, framework agnostic design
- Fast: only ~2x slower than fun call, 10-100x faster than yield coroutines

### `async/await`
- coroutines are subtypes of generators; shares a lot of code
    - pythonic sense, complete different object
- `await` -> `yield from` -> `YIELD_FORM` opcode
- `@types.coroutine
    - from generator to coroutine
- `__await__`
- `__aenter__`, `__aexit__`, `__aiter__`, `__anext__`

- github.com/magicstack/asnycpg
- `await` prepares database
- `async with`: 
    - allow run async code when enter the block
    - allow run async code when exit the block
- `async for`: call async code while iterating 

```Python
async def series(length):
    st = await con.prepare('SELECT generate_series(0, $1) AS idx`)
    
    async with con.transaction():
        async for rec in st.cursor(length):
            print(rec['idx'])
```

### asyncio
- toolbox for frameworks creators to use
    - not a actual framework. e.g., no http / db driver, etc
- part of standard library
- interperate with twisted and tornado

### What's inside
- standardized pluggable _event loop_
- Interfaces for Protocols and Transports
- Factories for servers and connections, streams
- Futures and Tasks:
    - Callbacks + coroutines, timeouts, cancellations
    - futures: bridge between old style and new async/await style
    - task: __coroutine runner__
- Subprocess, queues, synchronisation primitives(locks, events, etc)

### Common functions
- `asyncio.get_event_loop()`
    - returns an instance of actual event loop, then forget about it and only use `async/await`
- `loop.create_task()`
    - wraps coroutines in a "coroutine runner" (to allow event loop to actually run them)
    - a mechanism for the _event loop_ to work with _async/await_
- `loop.run_until_complete()` and `loop.run_forever()`
- `asyncio.gather()`
    - awaits on __multiple__ coroutines (or tasks or futures)
- `loop.run_in_executor()`
    - runs slow cpu-intensive or blocking IO code in a thread or in a process pool
- `loop.close()`
    - close and clean up resources 
    ```Python
    try:
        loop.run_until_complete(asyncio.gather(t1,t2))
    finally:
        loop.close()
    ```

In [16]:
import asyncio
async def say(what, when):
    await asyncio.sleep(when)
    print(what)

loop = asyncio.get_event_loop()
loop.create_task(say('hello', 0.5))
loop.create_task(say('world', 1))

#loop.run_forever()
# Or
loop.run_until_complete(asyncio.gather(t1, t2))

TypeError: run_until_complete() missing 1 required positional argument: 'future'

hello
world


In [13]:
def compute_pi(digits):
    result = 'example'
    return result

# default loop's executor (asyncio internal thread pool)
await loop.run_in_executor(None, compute_pi, 20000) 
# or

# Thread Pool
with concurrent.futures.ThreadPoolExecutor() as pool:
    await loop.run_in_executor(pool, compute_pi, 20000)

# ProcessPool
with concurrent.futures.ProcessPoolExecutor() as pool:
    await loop.run_in_executor(pool, compute_pi, 20000)
    

NameError: name 'concurrent' is not defined

### Debugging
- always debug mode when learning
    - PYTHONASYNCIODEBUG=1 python app.py
    - `loop.set_debug()`
- config `logging` to see errors
- config test runner to print out __warnings__

### uvloop
github.com/magicstack/uvloop

### PEP 492

# Getting to grips with asyncio

### What & Whay
- I/O is __high latency__
- Sequential programs wasts resources __waiting__ on IO
- Multithreading / multiprocessing carry
    - large __resource__ overheads
    - large __cognitive__ overheads
- GIL

### Definitions
1. Dealing with multiple things at once vs. doing multiple things at once
    - Concurrency \
      Tasks start, run and complete in overlapping time periods \
      asyncio
    - Parallelism \
      Tasks run simultaneously \
      treads / processes + multicore
2. The definition of synchronous contradicts common useage
    - Asynchronous \
      No need to wait before proceeding \
      shorter overall duration
    - Sequential (Synchronous) \
      Must complete before proceeding \
      longer overall duration
3. How much of the potential result is returned? 
    - Asynchronous \
      Returns immediately with a promise for the complete result \
      Callbacks / Futures / Promisses
    - Non-blocking \
      Returns immediately, with no result, partial result or complete result \
      Pooling
4. Preemptive vs Cooperative multitasking
    - Preemptive \
      Scheduler interrupts tasks \
      inconvenient context switches
    - Cooperative \
      Task _yield_ to scheduler \
      Uncooperative task hang system

## Coroutine

### Lucas sequence
```python
def lucas():
    '''
    >>> from coop.main import lucas
    >>> from itertools import islice
    >>>
    >>> list(islice(lucas(), 10))
    [2, 1, 3, 4, ...]
    '''
    yield 2
    a = 2
    b = 1
    while True:
        yield b
        a, b = b, a + b
```

### Linear search
- returns the first item satisfying a predicate
- Search is a regular function. Calling search either: \
  directly `returns` the result or \
  `raises` an exception
    
```python
def search(iterable, predicate):
    '''
    >>> from coop.main import search
    >>> search(lucas(), lambda x: len(str(x)) >= 6)
    103682
    '''
    for item in iterable:
        if predicate(item):
            return item
    raise ValueError("Not Found")
```

### Converting Blocking search to cooperative search
```python
def async_search(iterable, predicate):
    ...
        if predicate(item):
            return item
        yield
    raiseValueError("Not Found")

```

- async_search is a __generator__ function
- call async_search(...) will return a `generator` object
- use `next` to advance
- `yield` nothing and returning item
- when `return` in generator function, python packages the payload into `StopIteration` exception object (use `exc.value`)

### Task
Aggregates a coroutine and an integer id \
thin wrapper around routine
```python
class Task:
    next_id = 0
    def __init__(self, routine):
        self.id = Task.next_id
        Task.next_id += 1
        self.routine = routine
```

```Python
class Scheduler:
        
    def __init__(self):
        self.runnable_tasks = deque()
        self.completed_task_results = {}
        self.failed_task_errors = {}
        
    def add(self, routine):
        task = Task(routine)  # wraps routine in task
        self.runnable_tasks.append(task)
        return task.id
    
    def run_to_completion(self):
        ''' Run the tasks '''
        while len(self.runnable_tasks) != 0:
            task = self.runnable_tasks.popleft()
            print("Running task {} ... ".format(task.id), end='')
            try:
                yielded = next(task.routine)
            except StopIteration as stopped:
                print("Completed with result: {!r}".format(stopped.value))
                self.completed_task_results[task.id] = stopped.value
            except Exception as e:
                print("Failed with exception: {}".format(e))
                self.failed_task_errors[task.id] = e
            else:  # if "try" is successful
                assert yielded is None
                print("Now yielded")
                self.runnable_tasks.append(task)  # run forever
```

### Print a message at intervals
yields control untail a time interval expires \
sleep

```python
# blocking
def repetitive_message(msg, interval_seconds):
    while True:
        print(msg)
        start = time.time()
        expiry = start + interval_seconds
        while True:
            now = time.time()
            if now >= expiry:
                break
```

```python
# async #1
# bug: if interval_seconds is very small 
# the coroutine will NEVER yield and will hog the system
def async_repetitive_message(msg, interval_seconds):
    while True:
        print(msg)
        start = time.time()
        expiry = start + interval_seconds
        while True:
            now = time.time()
            if now >= expiry:
                break
            yield
```

```python
# async #2
# WHERE to yield control
# Ensure coroutine either 1. return immediately or
# 2. always yield __at least once__
def async_repetitive_message(msg, interval_seconds):
    while True:
        print(msg)
        start = time.time()
        expiry = start + interval_seconds
        while True:
            yield
            now = time.time()
            if now >= expiry:
                break
```

- Calling __blocking__ functions in __non-blocking__ function _stall_ the whole system
- `yield from`: call nested generator from generator

- Rule 1: Coroutines are contagious to __callees__ \
  __Everything__ you call - transitively - from a coroutine shoule be non-blocking

- Rule 2: Coroutines are contagious to __callers__ \
  Everything that callse - transitively - to a coroutine must iterate the generator.

- Everything will be async / non-blocking except _main_

### Extract coroutine
```python
def async_repetitive_message(msg, interval_seconds):
    while True:
        print(msg)
        yield from async_sleep(interval_seconds)
        
def async_sleep(interval_seconds):
    start = time.time()
    expiry = start + interval_seconds
    while True:
        yield
        now = time.time()
        if now >= expiry:
            break
```

- async_sleep always yield __at least once__
- async_sleep(0) yields __exactly once__
- Occurrence of __bare yield__ can be replaced with `yield from async_sleep(0)`
- async_sleep provided by scheduler -> no need of bare yield at all

## Asyncio Language and Library
futures, tasks, event-loops

1. `def async_foo` -> `async def foo`
2. `import asyncio`
3. `async_sleep(x)` -> `asyncio.sleep(x)`
4. no more `async def sleep(x)`
5. `yield from foo` -> `await foo`

### asnycio
- Coroutines implements __tasks__
- Coroutines __await__ other coroutines
- Event-loop schedules __concurrent__ tasks
- Tasks must __not block__
- Awaiting facilitates __context switches__: opportunaty for scheduler to run another task
- To yield control __without__ need a result: `await asyncio.sleep(0)`

### Coroutines vs. Coroutine Objects

- Coroutine: `async def search...` \
  code, __callable__
- Coroutine object: `c` in `c = search(lucas(), is_prime)` \
  code + execution state, __awaitable__

### Future
Encapsulates a potential result or error \
monitor a running task
- avoid calling _future = Future()_ constructor directly
- prevents event-loops specialising the future implementation

```
Future
    cancel()
    cancelled()
    done()
    result()
    exception()
    set_result()
    set_exception()
```

```python
async def monitored_search(iterable, predicate, future):
    try:
        found_item = await search(iterable, predicate)
    except ValueError as not_found:
        future.set_exception(not_found)
    else:  # no exception
        future.set_result(found_item)
        
async def monitor_future(future, interval_seconds):
    while not future.done():
        print("Waiting...")
        await asyncio.sleep(interval_seconds)
    print("Done")

async def thirteen_digit_prime(x):
    return (await is_prime(x)) and (len(str(x)) == 13
                                    
loop = asyncio.get_event_loop()
future = loop.create_future()  # Create
co_obj = monitored_search(lucas(), thirteen_digit_prime, future)
loop.create_task(co_obj)
loop.create_task(monitor_future(future, 1.0))
loop.run_until_complete(future)
print(future.result())
loop.close()
```

### Task
A subclass of Future (Task is-a future) which wraps(has) a coroutine

```python
search_task = loop.create_task(co_obj)
loop.create_task(monitor_future(search_task, 1.0))
loop.run_until_complete(future)
print(future.result())
loop.close()
```

### Creating Tasks
Prefer __ensure_future()__ which returns a task:

Bad: `task = Task(coro_obj)`\
OK: `task = loop.create_task(coro_obj)` \
Better: `task = ensure_future(awaitable, loop=loop)` idempoeten, more general

### Waiting for tasks
`asyncio.gather()`
- small bug: _Warning: task was destroyed buy it is pending_ \
  The event-loop is being closed before _monitor_task_ has exited gracefully
```Python
...
search_and_monitor_future = asyncio.gather(search_task, monitor_task)
loop.run_until_complete(search_and_monitor_future)
...
```

### Awaitables
awaitable object can be used in an await expression. \
__coroutines__, __Tasks__, and __Futures__.

## I/O stuff

### I/O await scheduler
Tasks suspend when waiting, scheduled when data ready

### Layered abstractions for asyncio

Layer | based | func
:--- | :--- | :---
coroutines | generator-based | suspendable / resumable functions
event-loop | task-based | await sockets / file descriptors
transports | channel-based | date read / write
protocols | callback-based | connections made / lost, data received
streams | coroutine-based | stream-like comms
