# Way Three: Asyncio

## A way to think of concurrent/parallel problems

* One core   
* 2-10 core    
* Distributed computing

### One core
* Sequential programming
* Threads
* Asynchronous programming

### 2-N core
* Everything in one core model
* plus: processes

### Distrubuted computing
* Everything in 2-N core model (at each node)
* plus: services, communication channels, CAP theorem, ....


## The async question: How can we best utilize a single core?

They are MUCH faster than many people realize:

  * SIMD
  * GPU


# Motivating example: Download thousands of websites, concurrently, using a single core and asyncio

In [None]:
import asyncio
import random
import asyncio
import time
from aiohttp import ClientSession

loop = asyncio.get_event_loop()

async def fetch(url, session):
    async with session.get(url) as response:
        delay = response.headers.get("DELAY")
        date = response.headers.get("DATE")
        print("{}:{} with delay {}".format(date, response.url, delay))
        return await response.read()


async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        await fetch(url, session)


async def run(r):
    start_time = time.time()
    url = "https://github.com"
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(1000)

    # Create client session that will ensure we dont open new connection
    # per each request.
    async with ClientSession() as session:
        for i in range(r):
            # pass Semaphore and session to every GET request
            task = loop.create_task(bound_fetch(sem, url, session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses
        end_time = time.time()
        print( (end_time - start_time))

number = 500
future = loop.create_task(run(number))



# Invent asyncio from scratch in 10 minutes or less

In [None]:
def func1( start, end, step_size):
    for x in range(start,end,step_size):
        yield x

In [None]:
generator = func1(0,100,2)
print(generator)

In [None]:
next(generator)

In [None]:
import random

func_to_run = [func1(0,100,2), func1(1,101,2)]

def my_event_loop():
    for step_count in range(0,10):
        f = random.choice(func_to_run)
        print(next(f))

In [None]:
my_event_loop()

## what would mess it up?

Maybe if one of the functions took a long time every once in a while

In [None]:
import time
import random

def func1( start, end, step_size):
    for x in range(start,end,step_size):
        yield x
        time.sleep(random.random())


In [None]:
func_to_run = [func1(0,100,2), func1(1,101,2)]

def my_event_loop():
    for step_count in range(0,10):
        f = random.choice(func_to_run)
        print(next(f))

In [None]:
my_event_loop()

## How could we fix it?

In [None]:
# Replace the blocking sleep with a non-blocking version

def async_sleep(delay):
    start_time = time.time()
    yield
    while time.time() - start_time < delay:
        yield
        

In [None]:
# async_sleep(some_number) ==> generator
generator = async_sleep(1000)
print(generator)

In [None]:
# We can invoke the generator by hand
result = next(generator)
print(result)

In [None]:
# Change our function to use the async generator
def func1( start, end, step_size):
    for x in range(start,end,step_size):
        yield x
        yield from async_sleep(random.random())

func_to_run = [func1(0,100,2), func1(1,101,2)]

def my_event_loop():
    for step_count in range(0,10):
        f = random.choice(func_to_run)
        print(next(f))

In [None]:
# Did we get it?  Does it work?
my_event_loop()

## Our async sleep returns None quite a bit.
We can imagine several ways to fix this in the code, let's just do something simple for now

In [None]:
def my_event_loop():
    step_count = 0
    while step_count < 50:
        f = random.choice(func_to_run)
        result = next(f)
        if result is not None:
            step_count += 1
            print(result)

In [None]:
# Did we get it?  Does it work?
my_event_loop()

In [None]:
# Did we get it?  Does it work?  Let's try lots of "co-routines"
func_to_run = [
    func1(0,100,2), 
    func1(0,100,2), 
    func1(0,100,2), 
    func1(0,100,2), 
    func1(0,100,2), 
    func1(0,100,2), 
    func1(0,100,2), 
    func1(1,100,2),
    func1(1,100,2),
    func1(1,100,2),
    func1(1,100,2),
    func1(1,100,2),
    func1(1,100,2),
    func1(1,100,2),
    func1(1,100,2),
    func1(1,100,2),
]
my_event_loop()


# You have now written the heart of asyncio

Okay, not totally true, but true enough to get the central concept.
Let's refactor the code into "real" asyncio code


In [None]:
import asyncio

# Change the function slightly
async def func1( start, end, step_size):
    for x in range(start,end,step_size):
        print(x)
        await asyncio.sleep(random.random())

In [None]:
# async def returns a "coroutine" instead of a "generator".  Don't let that throw you.
coroutine = func1(0,50,2)
print(coroutine)

In [None]:
# The event loop is provided for you
my_event_loop = asyncio.get_event_loop()

# We create a "Task" -- represents a scheduled co-routine
future_result = my_event_loop.create_task(func1(0,50,2))

print(future_result)

In [None]:
# Tasks are a type of Future.  You can ask about their state
future_result.done()

In [None]:
# And you can ask about their result
print(future_result.result())

## Let's rewrite fibonacci as an asyncio function

In [None]:
import asyncio

loop = asyncio.get_event_loop()  

async def fibonacci(n):
    await asyncio.sleep(0)
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        val1 = await fibonacci(n - 1)
        val2 = await fibonacci(n-2)
        return val1 + val2



In [None]:
# Schedule the task
task = loop.create_task(fibonacci(20))

In [None]:
print(task.done())
print(task.result())

# Important notes on the event loop

[official docs](https://docs.python.org/3/library/asyncio-eventloop.html)


## Event loops are somewhat limited inside the Jupyter notebook (because a loop is already running)

Otherwise, event loop methods you are likely to use

```python
loop.run_forever()
loop.run_until_completion()
```

## Some simple, useful ways to run functions

### call_soon

```python
import asyncio

def hello_world(loop):
    print('Hello World')

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

```

### call_later
```python
import asyncio

def hello_world(loop):
    print('Hello World')

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
delay_in_seconds = 5
loop.call_later(delay_in_seconds, hello_world, loop)
```


## There are MANY other methods on event loop, many of which are low level which you won't use


# Don't use blocking functions

```python
# no
time.sleep(3)
# yes
asyncio.sleep(3)

# no
for x in y:
    ...
# yes
async for x in y:
    ...
    
# no
which blocking_context_thing:
    ...    
# yes
async with async_context_thing
    ...
    
   
# no
with open('some_file.txt', 'r') as f:
  for line in f:
     print line        
# yes
async with aiofiles.open('some_file.txt', mode='r') as f:
    async for line in f:
        print(line)
    

```


# You CAN use threads and processes with asyncio

In [None]:
import asyncio
import time

def function_with_blocking_stuff():
    time.sleep(3)
    print("Did blocking thing")
    
loop = asyncio.get_event_loop()

my_future = loop.run_in_executor(None, function_with_blocking_stuff)

In [None]:
import asyncio
import concurrent.futures
import time

def function_with_blocking_stuff():
    time.sleep(3)
    print("Did blocking thing")
    
loop = asyncio.get_event_loop()

with concurrent.futures.ThreadPoolExecutor() as executor:
    my_future = loop.run_in_executor(executor, function_with_blocking_stuff)



# Lessons learned
1. There isn't any magic
2. Can't use anything blocking
3. Single threads can do a lot