# Asyncronous in depth. https://www.youtube.com/watch?v=GSiZkP7cI80

A synchronous program is executed one step at a time. Even with conditional branching, loops and function calls, you can still think about the code in terms of taking one execution step at a time.
An asynchronous program behaves differently. It still takes one execution step at a time. The difference is that the system may not wait for an execution step to be completed before moving on to the next one.

### 1. Asyncronous example without generator etc

with simple program we can simulate how asyncrnous actually works. in this example we create baseclass named Task
Task has run & ready attribute. we use run to go to next operation & use ready to check wheter the Task is finished.

In [100]:
import time
from abc import ABC, abstractmethod

class Task:
    def __init__(self):
        self.ready = False
    
    def run():
        return NotImplementedError()

class Sleep(Task):
    def __init__(self, duration:int = None):
        self.threshold = time.time() + duration
        self.ready = False    
    def run(self):
        now = time.time()
        if now >= self.threshold:
            self.ready = True

In [101]:
from typing import Iterable, Set, List

def wait(ts: Iterable[Task]):
    orig:List[Task] = list(ts)
    pending:Set[Task] = set(orig)
    before = time.time()
    
    while pending:
        for task in list(pending):
            task.run()
            if task.ready:
                pending.remove(task)
                
    print(f'duration = {time.time() - before}')

In [102]:
wait([Sleep(3) for _ in range(100)]) # the duration is same even thought in total there are hundred of sleep function

duration = 2.9999938011169434


### 2. Asyncronous example with generator

in this example we use the same kind of program as the first example. but this time it use generator instead of sleep class

In [115]:
from typing import Generator, List

def sleep_generator(duration):
    start = time.time()
    while (time.time() - start) < duration:
        yield
    return 100
        
def wait_gen(ts: List[Generator]):
    orig:List[Task] = list(ts)
    pending:Set[Task] = set(orig)
    before = time.time()
    results = {task:None for task in orig}
    
    while pending:
        for task in list(pending):
            try:
                task.send(None)
            except StopIteration as value:   
                results[task] = str(value)
                pending.remove(task)
                
    print(f'duration = {time.time() - before}')
    print("result", list(map(lambda x: x[:40], results.values())))

In [116]:
wait_gen([sleep_generator(3) for _ in range(100)])

duration = 3.000119924545288
result ['100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100', '100']


In [117]:
start = time.time()
for _ in sleep_generator(2):
    print("loading", time.time() - start)
    time.sleep(0.3)

loading 4.9591064453125e-05
loading 0.3007392883300781
loading 0.6014463901519775
loading 0.9021203517913818
loading 1.2031991481781006
loading 1.5036845207214355
loading 1.8044872283935547


### 3. Asyncronous with multiple related generator

in this example, we handle more than 1 generator for each task. to show how generator can actually called each other & still have asyncronous behavior (pending & resume)

In [118]:
def bar():
    yield from sleep_generator(2) # generator function from example #2
    return 123

def foo():
    value = yield from bar()
    return value

wait_gen([foo() for _ in range(100)])

duration = 2.000267505645752
result ['123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123', '123']


from example 3 above, u can see that it still return 2 seconds in duration even though the sleep generator is called from other generator.
python generator actually go to other generator process from foo yield to sleep generator yield & so on between each foo.

### 4. Real World Example

In [121]:
from requests import get, Response


SIZE = 1000 # how many data you want to get in single stream

def read(r: Response):
    data = b""
    for chunk in r.iter_content(SIZE):
#         print("get chuck data from stream")
        data += chunk
        yield
    return data

def fetch(url):
    with get(url, stream=True) as r:
        data = yield from read(r)
    return data.decode('utf-8')


print("fetch 1 url, google.com", end='\n\n\n')
fetch_gen = fetch("https://www.google.com")

while True:
    try:
        fetch_gen.send(None)
    except StopIteration as value:
        print(str(value)[:40])
        break

fetch 1 url, google.com


<!doctype html><html itemscope="" itemty


In [122]:
results = wait_gen([fetch('https://www.google.com'), fetch('https://www.yahoo.com')])

duration = 0.7462968826293945
result ['<!doctype html><html itemscope="" itemty', '<!DOCTYPE html>\n<html id="atomic" lang="']


### 5. Asyncio, Async Await

In [145]:
import asyncio
import aiohttp

async def wait_io(ts: Iterable):
    before = time.time()
    results = await asyncio.gather(*ts)
    print("duration ", time.time() - before)
    print(list(map(lambda x: x[:20], results)))

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

    
results = await wait_io([fetch('https://www.google.com') for _ in range(100)])

duration  0.5673801898956299
['<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype html><html', '<!doctype