# What is asyncio?

- I/O is <strong style="color:red">high latency</strong>
- Sequential program waste resources <strong style="color:red">waiting</strong> on I/O
- Multithreading/Multiprocessing carry:
    - large <strong style="color:red">resources</strong> overheads
    - large <strong style="color:red">cognitive</strong> overheads
- Python interpretor is <strong style="color:red">shared mutable state</strong> protected by GIL

<img src="./concurrency_vs_parallelism.png"> 

<img src="./async_vs_sync.png">

<img src="async_vs_non_blocking.png">

<img src="preemptive_vs_cooperative.png">

# Asyncio allows us to write:
- asyncronous, concurrent, cooperative tasks in sequential style

# Cooperating coroutines :
- (concurrent programming without asyncio)

#### The Lucas Sequence (fibonacci):

In [1]:
def lucas():
    yield 2
    a = 2
    b = 1
    while True:
        yield b
        a, b = b, a + b

In [2]:
from itertools import islice

In [3]:
list(islice(lucas(), 10))

[2, 1, 3, 4, 7, 11, 18, 29, 47, 76]

#### Linear Search:
- Return first item satsifying predicate

In [4]:
def search(iterable, predicate):
    """
    - search is a regular function:
    - calling search either:
         a) directly returns the result or,
         b) raises an exception
    """
    for item in iterable:
        if predicate(item):
            return item
    raise ValueError("Not Found")

In [5]:
search(lucas(), lambda x: len(str(x)) >= 6)

103682

#### Cooperative linear search
- periodically yields control to the caller.
- on completion returns result in exception payload

In [6]:
def async_search(iterable, predicate):
    """
    - async_search is a generator function:
        a) calling async_search always returns a generator object:
        b) search progresses when iterated with next()
        c) Final result "returned" in StopIteration payload
    """
    for item in iterable:
        if predicate(item):
            return item
        yield
    raise ValueError("Not Found")

In [7]:
g = async_search(lucas(), lambda x: x >= 10)
g

<generator object async_search at 0x000001BFB8710750>

In [8]:
next(g)
next(g)
print("hello world")
next(g)
next(g)
next(g)
next(g)

hello world


StopIteration: 11

#### Task
- aggregates a coroutine and an integer id

In [16]:
class Task:
    
    next_id = 0
    
    def __init__(self, routine):
        self.id = Task.next_id
        Task.next_id += 1
        self.routine = routine

In [17]:
from collections import deque

In [18]:
class Schedular:
    
    def __init__(self):
        self.runnable_tasks = deque()
        self.completed_tasks_results = {}
        self.failed_tasks_errors = {}
        
    def add(self, routine):
        task = Task(routine)
        self.runnable_tasks.append(task)
        return task.id
    
    def run_to_completion(self):
        while len(self.runnable_tasks) != 0:
            task = self.runnable_tasks.popleft()
            print("Running task {}...".format(task.id), end="")
            try:
                yielded = next(task.routine)
            except StopIteration as stopped:
                print("Completed with result: {!r}".format(stopped.value))
                self.completed_tasks_results[task.id] = stopped.value
            except Exception as e:
                print("Failed with exception: {}".format(e))
                self.failed_tasks_errors[task.id] = e
            else:
                assert yielded is None
                print("Now yielded")
                self.runnable_tasks.append(task)

In [19]:
schedular = Schedular()

In [20]:
schedular.add(async_search(lucas(), lambda x: len(str(x)) >= 6))

0

In [21]:
schedular.run_to_completion()

Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Now yielded
Running task 0...Completed with result: 103682


In [22]:
schedular.completed_tasks_results.pop(0)

103682

In [23]:
schedular.add(async_search(lucas(), lambda x: len(str(x)) >= 7))
schedular.add(async_search(lucas(), lambda x: len(str(x)) >= 9))

2

In [24]:
schedular.run_to_completion()

Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1...Now yielded
Running task 2...Now yielded
Running task 1

In [25]:
schedular.completed_tasks_results.pop(1)

1149851

In [26]:
schedular.completed_tasks_results.pop(2)

141422324

#### A primality testing function
- simple but inefficient

In [27]:
from math import sqrt

def is_prime(x):
    if x < 2:
        return False
    for i in range(2, int(sqrt(x)) + 1):
        if x % i == 0:
            return False
    return True

In [28]:
is_prime(2)

True

In [29]:
is_prime(12)

False

In [30]:
is_prime(2**31 -1)

True

In [31]:
# is_prime(2**61 -1) # takes week don't run it

### Filter and print coroutine
- similar to async_search, but print all matches

In [32]:
def async_print_matches(iterable, predicate):
    for item in iterable:
        if predicate(item):
            print("Found: ", item, end=", ")
        yield

In [33]:
schedular = Schedular()

In [34]:
schedular.add(async_print_matches(lucas(), is_prime))

3

### Print a message at intervals
- Yields control until a time interval expires

In [35]:
import time

In [36]:
def async_repetitive_message(message, interval_seconds):
    while True:
        print(message)
        start = time.time()
        expiry = start + interval_seconds
        while True:
            yield # ensure that coroutines always yield at lease once if they can't complete immediately
            now = time.time()
            if now >= expiry:
                break

In [37]:
schedular = Schedular()

In [38]:
schedular.add(async_repetitive_message("Unattended baggage will be destroyed", 2.5))

4

In [39]:
schedular.add(async_print_matches(lucas(), is_prime))

5

### In above code:
- <code>async_print_matches</code> is non-blocking but <code>is_prime</code> blocks on large numbers, stalling the whole system

### Everything we call - transitively - from a coroutine should be non-blocking.
- coroutines are contagious to <strong>callees</strong>

In [40]:
from math import sqrt

def async_is_prime(x):
    if x < 2:
        return False
    for i in range(2, int(sqrt(x)) + 1):
        if x % i == 0:
            return False
        yield
    return True

In [41]:
def async_print_matches(iterable, predicate):
    for item in iterable:
        matches = yield from async_predicate(item) # allows the predicate to make progress and yield control by invoking with yield from
        if matches:
            print("Found: ", item)
        # yield      # => here we dont need yield

### Everything that calls - transitively - to a coroutine must iterate the generator.
- coroutines are contagious to <strong>callers</strong>

### Refactoring - extracting coroutine
- requires invoking via yield from

In [42]:
import time
from math import sqrt


def async_sleep(interval_seconds):
    """
    async_sleep always yields atleast once 
    async_sleep(0) yields exactly once
    """
    start = time.time()
    expiry = start + interval_seconds
    while True:
        yield      
        now = time.time()
        if now >= expiry:
            break

            
def async_is_prime(x):
    if x < 2:
        return False
    for i in range(2, int(sqrt(x)) + 1):
        if x % i == 0:
            return False
        yield from async_sleep(0)
    return True


def async_search(iterable, predicate):
    """
    - async_search is a generator function:
        a) calling async_search always returns a generator object:
        b) search progresses when iterated with next()
        c) Final result "returned" in StopIteration payload
    """
    for item in iterable:
        if predicate(item):
            return item
        yield from async_sleep(0)
    raise ValueError("Not Found")

    
def async_print_matches(iterable, async_predicate):
    for item in iterable:
        matches = yield from async_predicate(item) # allows the predicate to make progress and yield control by invoking with yield from
        if matches:
            print("Found : ", item)
        # yield      # => here we dont need yield
        
        
def async_repetitive_message(message, interval_seconds):
    while True:
        print(message)
        yield from async_sleep(interval_seconds)

In [43]:
schedular = Schedular()

In [44]:
schedular.add(async_repetitive_message("Unattended baggage will be destroyed", 2.5))

6

In [45]:
schedular.add(async_print_matches(lucas(), async_is_prime))

7