In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

# A practical guide to speed up your application
# with Asyncio

By Niels Denissen

# Niels Denissen

<center><img src="https://media.licdn.com/media/AAEAAQAAAAAAAAd8AAAAJDAyODhlNzFlLWYwYTktNDAxNC1iYjBkLWFhNTdlMWZhNmQzNg.jpg" style="width: 350px;" alt="Drawing"></center>

<table style="width:60%;border:0px;">
  <tr style="border:0px;">
    <td style="border:0px;"><img src="./img/wbaa.png" style="width:350px"></td>
    <td style="border:0px;">Data Engineer / Scientist</td>
  </tr>
  <tr style="border:0px;">
    <td style="border:0px;"><img src="./img/uu.png" style="width:250px"></td>
    <td style="border:0px;">Msc. Artificial Intelligence</td>
  </tr>
  <tr border="0" style="border:0px;">
    <td style="border:0px;"><img src="./img/tue.svg" style="width:200px"></td>
    <td style="border:0px;">Bsc. Computer Science</td>
  </tr>
</table>

# At the end of this session, you will...
* Understand the fundamentals of async programming
* Know when it will speed up your program
* Be able to change your sync code to async code
* (if we go really fast...) know how to circumvent typical pitfalls

<center><img width=90% src="./img/Slide/Slide1.jpg"></center>

<center><img width=90% src="./img/Slide/Slide2.jpg"></center>

# Why would you want this?

In general for applications that rely heavily on I/O.

<center><img src="./img/results_bar.png"></center>

# How does it work?!

<img src="./img/Slide/Slide3.jpg">

<img src="./img/Slide/Slide4.jpg">

<img src="./img/Slide/Slide5.jpg">

<img src="./img/Slide/Slide6.jpg">

<img src="./img/Slide/Slide7.jpg">

<img src="./img/Slide/Slide8.jpg">

# Key things to remember:

1. **Program async tasks**: i.e, when to give control back to loop
2. **Create an event loop**: it controls stuff for you
3. **Assign tasks to event loop**: tell  loop which tasks it has to run
4. **Run event loop**: specifying when it should stop

# Let's look at some code
Starting with a regular synchronous example

In [None]:
import time

def pydataTalk(sleep_time):
    print("Welcome")
    time.sleep(sleep_time)
    print("?")
    time.sleep(sleep_time)
    print("Profit $")
    return "Knowledge"

pydataTalk(sleep_time=1)

**Speaker note:** See this as a process (me) doing something with data (saying words). This sleep in between my slides represents a database query waiting for the database to return the data.

## Now what does this look like asynchronous?
* **```async```** in front of function you want to run asynchronously
* **```await```** inside function to give back control to loop

Let's rewrite the function ```pydataTalk``` to it's async equivalent together

In [None]:
# Rewrite this function to it's async equivalent
import time

# 1. Program async tasks: i.e, when to give control back to loop
def pydataTalk(sleep_time):
    print("Welcome")
    time.sleep(sleep_time)
    print("?")
    time.sleep(sleep_time)
    print("Profit $")
    return "Knowledge"

In [None]:
# What happens if we call? ...
pydataTalk(sleep_time=1)

In [None]:
# 2. Create an event loop: it controls stuff for you

# 3. Assign tasks to event loop: tell loop which tasks it has to run

# 4. Run event loop: specifying when it should stop


In [None]:
# Back-up solution

# 1. Program async tasks: i.e, when to give control back to loop
import asyncio

async def pydataTalk(sleep_time):
    print("Welcome")
    await asyncio.sleep(sleep_time)
    print("?")
    await asyncio.sleep(sleep_time)
    print("Profit $")
    return "Knowledge"


# 2. Create an event loop: it controls stuff for you
loop = asyncio.get_event_loop()

# 3. Assign tasks to event loop: tell loop which tasks it has to run
taskA = loop.create_task(pydataTalk(sleep_time=0.5))

# 4. Run event loop: specifying when it should stop
loop.run_until_complete(taskA)

# Let's add another async task
What will happen here?

In [None]:
async def drinkWater(sleep_time):
    print("1st sip")
    await asyncio.sleep(sleep_time)
    print("2nd sip")
    return "Bathroom break"

In [None]:
talk = pydataTalk(sleep_time=1)
drink = drinkWater(sleep_time=1)
task_talk = loop.create_task(talk)
task_drink = loop.create_task(drink)
loop.run_until_complete(task_drink)

In [None]:
loop.run_until_complete(task_talk)

# Proper way of executing multiple tasks

In [None]:
talk = pydataTalk(sleep_time=1)
drink = drinkWater(sleep_time=1)
task_talk = loop.create_task(talk)
task_drink = loop.create_task(drink)
loop.run_until_complete(asyncio.gather(task_talk, task_drink))

# Key things to remember (with code):

1) **Program async tasks**: i.e, when to give control back to loop  
- **```async```** in front of function (```async def functionA():```)
- **```await```** to give back control to loop

2) **Create an event loop**: it controls stuff for you
- ```loop = asyncio.get_event_loop()```

3) **Assign tasks to event loop**: tell loop which tasks it has to run
- ```taskA = loop.create_task(functionA)```

4) **Run event loop**: specifying when it should stop
- ```loop.run_until_complete(taskA)```

# On to a more realistic example!

Suppose we want to scrape marktplaats (eBay) pages and write the results to a database.

**Prerequisite:** Make sure to start postgres

In [None]:
import requests
from bs4 import BeautifulSoup

def get_page_soup(url, session):
    """
    Try get the given url and return souped version.
    If an error occured, return None.
    """
    try:
        #Get HTML page
        response = session.get(url)
            
        # Parse the page using beautiful soup
        return BeautifulSoup(response.text, "html.parser")
    except:
        print("Couldn't find url: {}".format(url))
        return None

In [None]:
def gen_data(pages=list(range(1,200))):
    """
    Data generator: reads pages from marktplaats and yields descriptions given with products
    """
    
    # Specification of base url
    base_url = "http://www.marktplaats.nl/z/motoren.html?categoryId=678&currentPage="
    
    # Use same session throughout the lifetime of the generator
    with requests.Session() as session:
        for page in pages:
            soup = get_page_soup(base_url + str(page), session)
            if soup is None: continue

            # Run through all articles
            for article in soup.find_all('article'):
                try:
                    # Extract url of article and get souped version
                    article_url = article.attrs['data-url']
                    article_soup = get_page_soup(article_url, session)
                    if article_soup is None: continue

                    # Run through all descriptions
                    for a in article_soup.find_all('div', {'id': 'vip-ad-description'}):
                        if a.text is not None: yield str(a.text)
                except KeyError:
                    # Couldn't find the url of this article, pass it
                    pass
        
data_generator = gen_data()

In [None]:
next(data_generator)

## Synchronous

In [None]:
import psycopg2

class Sync:
    # DSN for postgres
    postgres_dsn = "dbname='postgres' user='postgres' password='pydata' host='localhost'"

    def _write(self, page, conn):
        """
        Private funtion to write all descriptions from marktplaats page to database
        """
        description_counter = 0
        with conn.cursor() as cursor:
            for data_entry in gen_data(pages=[page]):
                cursor.execute("""INSERT INTO pydata(text) VALUES (%s)""", [data_entry])
                description_counter += 1
        
        return description_counter
            
    def write_single(self, page):
        """
        Write single page
        """
        with psycopg2.connect(dsn=self.postgres_dsn) as conn:
            return self._write(page,conn)
    
    def write_everything(self, n_pages=8):
        """
        Write specified number of pages
        """
        description_counters = []
        with psycopg2.connect(dsn=self.postgres_dsn) as conn:
            for page in range(1, n_pages+1):
                description_counters.append(self._write(page, conn))
        
        return sum(description_counters)

    def read(self):
        """
        Read all from postgres
        """
        with psycopg2.connect(dsn=self.postgres_dsn) as conn:
            with conn.cursor() as cursor:
                cursor.execute("SELECT text FROM pydata;")
                return cursor.fetchall()

## Multi-threaded

In [None]:
from multiprocessing import Pool

class Threads:
    def write_everything(self, n_pages=8, no_threads=8):
        """
        Start thread pool with specified number of threads, start synchronous client in each
        """
        p = Pool(processes=no_threads)
        description_counters = p.map(Sync().write_single, range(1,n_pages+1))
        p.close()
        p.terminate()
        
        # Return the sum of each result (number of articles done)
        return sum(description_counters)

## Asynchronous

For the asynchronous part we'll have to rewrite our generator. This needs to be asynchronous as well (supported since  3.6: https://www.python.org/dev/peps/pep-0525/)

In [None]:
import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def get_page_soup_async(url, session, sem):
    """
    Try get the given url and return souped version.
    If an error occured, return None.
    """
    try:
        # Get HTML page using semaphore to limit the open connections at a time
        async with sem:
            async with session.get(url) as resp:
                response = await resp.text()
            
        # Parse the page using beautiful soup
        return BeautifulSoup(response, "html.parser")
    except:
        print("Couldn't find url: {}".format(url))
        return None

In [None]:
async def gen_data_async(pages=list(range(1,200))):
    """
    Data generator: reads pages from marktplaats and yields descriptions given with products
    """
    # Specification of base url
    base_url = "http://www.marktplaats.nl/z/motoren.html?categoryId=678&currentPage="
    
    # Use same session throughout the lifetime of the generator
    async with aiohttp.ClientSession() as session:
        # create instance of Semaphore to limit amount of open http connections
        sem = asyncio.Semaphore(1000)
    
        for page in pages:
            response_soup = await get_page_soup_async(base_url + str(page), session, sem)
            if response_soup is None: continue
            
            # Run through all articles
            for article in response_soup.find_all('article'):
                try:
                    # Extract url of article and get souped version
                    article_url = article.attrs['data-url']
                    article_soup = await get_page_soup_async(article_url, session, sem)
                    if article_soup is None: continue

                    # Run through all descriptions
                    for a in article_soup.find_all('div', {'id': 'vip-ad-description'}):
                        if a.text is not None: yield str(a.text)
                except KeyError:
                    # Couldn't find the url of this article, pass it
                    pass

In [None]:
import asyncio
import asyncpg

class Async:
    async def _write(self, page):
        """
        Write descriptions of page to database
        """
        description_counter = 0
        async with self.conn_pool_async.acquire() as conn_async:
            async with conn_async.transaction():
                async for data_entry in gen_data_async(pages=[page]):
                    await conn_async.execute("""INSERT INTO pydata(text) VALUES ($1)""", data_entry)
                    description_counter += 1
                    
        return description_counter

    def write_everything(self, n_pages=8):
        """
        Write multiple pages asynchronously to the database
        """
        # Create new loop and tell asyncio to use it
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        
        # Create the connection pool
        self.conn_pool_async = self.loop.run_until_complete(
            asyncpg.create_pool(min_size=1, max_size=100, host='localhost', port=5432,
                                user='postgres', password='pydata', database='postgres'))
        
        # Create a task per page and run until all of them complete
        tasks = [self.loop.create_task(self._write(p)) for p in range(1, n_pages+1)]
        description_counters = self.loop.run_until_complete(asyncio.gather(*tasks))
        
        # Close the loop
        self.loop.close()
        
        return sum(description_counters)

## Speed comparison

First we define some functions to time the different implementations.

In [None]:
import time

def time_sync(n_pages):
    start_time = time.time()
    n_written = Sync().write_everything(n_pages=n_pages)
    runtime = time.time() - start_time
    return (n_written, runtime)

def time_threads(n_pages, n_threads):
    start_time = time.time()
    n_written = Threads().write_everything(n_pages=n_pages, no_threads=n_threads)
    runtime = time.time() - start_time
    return (n_written, runtime)

def time_async(n_pages):
    start_time = time.time()
    n_written = Async().write_everything(n_pages=n_pages)
    runtime = time.time() - start_time
    return (n_written, runtime)

Now we'll run each implementation for various number of pages and a various number of threads for multithreading.

In [None]:
import pandas as pd

result_list = []

# Run through range of pages
for n_pages in [1,2,4,8,16,32,64,128]:
    print("Processing n_pages={}".format(n_pages))
    
    print("\t Async")
    n_written, runtime = time_async(n_pages)
    result_list.append(["async", n_written, 1, runtime])
    
    print("\t Sync")
    n_written, runtime = time_sync(n_pages)
    result_list.append(["sync", n_written, 1, runtime])
    
    for n_threads in [2,4,8]:
        print("\t Threads: {}".format(n_threads))
        n_written, runtime = time_threads(n_pages, n_threads)
        result_list.append(["threaded", n_written, n_threads, runtime])
        
# Write all to a pandas Dataframe
results_df = pd.DataFrame(result_list, columns=["type", "n_written","n_threads","time"])

In [None]:
import pickle
pickle.dump(results_df, open("data/results_df.p","wb"))

In [None]:
results_df_aggr = results_df.groupby(["type","n_threads"]).sum()
results_df_aggr['sec_article'] = results_df_aggr.time / results_df_aggr.n_written

In [None]:
fig, ax = plt.subplots(1,1, figsize=(15,5))

# Plot bar chart of articles done per second
ax.bar(left=[i+0.8 for i in range(len(results_df_aggr.sec_article))], 
       height=results_df_aggr.sec_article, 
       width=0.9,
       color=['green','blue','orange','red','black'])

ax.set_xticks([i+0.8 for i in range(len(results_df_aggr.sec_article))])
ax.set_xticklabels(["Aysnc","Sync - 1 Thread","Sync - 2 Threads","Sync - 4 Threads","Sync - 8 Threads"])
plt.legend(loc='best')
plt.ylabel("Time per article (s)")
plt.savefig('results_bar.png',dpi=100)
plt.show()

<center><img src="./img/results_bar.png"></center>

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

# Define plot and colors to use later on
fig, ax = plt.subplots(1,1, figsize=(15,5))
colors = {8:"black", 4:"red", 2:"orange", "sync": "blue", "async": "green"}

# Plot sync
results_sync = results_df[results_df.type == "sync"]
ax.plot(results_sync.n_written, results_sync.time, '-', color=colors['sync'], label='sync')

# Plot threads
for tu in results_df.n_threads[results_df.n_threads > 1].unique():
    results_threads = results_df[(results_df.type == "threaded") & (results_df.n_threads == tu)]
    ax.plot(results_threads.n_written, results_threads.time, '-', color=colors[tu], label=str(tu)+' threads')
    
# Plot async
results_async = results_df[results_df.type == "async"]
ax.plot(results_async.n_written, results_async.time, '-', color=colors['async'], label='async')

plt.legend(loc='best')
plt.xlabel("Articles written (#)")
plt.ylabel("Time (s)")
plt.savefig('results_line.png',dpi=100)
plt.show()

<center><img src="./img/results_line.png"></center>

## The solution to our Marktplaats problem

In [None]:
# Restart the database and gather all results anew here:
Async().write_everything(n_pages=150)

In [None]:
import re
from stop_words import get_stop_words

regex = re.compile('[^a-zA-Z0-9]')
stop_words = get_stop_words('dutch')

words = [regex.sub('', word.lower()) for item in Sync().read() for word in item[0].split(' ')]
words = [word for word in words if len(word) > 0 and word not in stop_words]

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
%matplotlib inline

count_words_sorted = pd.Series(Counter(words)).sort_values(ascending=True)

high_counts = count_words_sorted[count_words_sorted.values > 1000]

fig, ax = plt.subplots(1,1, figsize=(15,5))
ax.barh(bottom=[i+0.8 for i in range(len(high_counts))], width=high_counts.values, height=0.9)
ax.set_yticks([i+0.8 for i in range(len(high_counts))])
ax.set_yticklabels(high_counts.index)

plt.xlabel("Occurences in articles (#)")
plt.savefig('results_marktplaats.png',dpi=100)
plt.show()

<center><img src="./img/results_marktplaats.png"></center>

# Learnings from practice

Here I'll list some things I've encountered in practice and make some suggestions on how you could solve this:
- Exception handling
- Passing data between tasks
- Control your tasks

## Exception handling

Handling exceptions raised in tasks of a loop can be very confusing.

In [None]:
import asyncio

async def pleaseExceptMe(sleep_time):
    print("\t Wait for it ...")
    await asyncio.sleep(sleep_time)
    print("\t Incoming!")
    raise Exception("You shall not PASS!")

In [None]:
loop = asyncio.get_event_loop()

# Create except task and run it
task_except = loop.create_task(pleaseExceptMe(sleep_time=2))
loop.run_until_complete(task_except)

What if we want to run something indefinitely (like our server example)?

In [None]:
async def pydataTalk(sleep_time):
    while True:
        print("Welcome")
        await asyncio.sleep(sleep_time)
        print("?")
        await asyncio.sleep(sleep_time)
        print("Profit $")

We need a stopper to stop the loop after a specified amount of time. Otherwise it runs forever.

In [None]:
async def stopper(loop_object, sleep_time):
    await asyncio.sleep(sleep_time)
    loop_object.stop()

So what would happen now?

In [None]:
# Create new loop and tell asyncio to use it
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Create tasks and run indefinitely
task_talk = loop.create_task(pydataTalk(sleep_time=1))
task_except = loop.create_task(pleaseExceptMe(sleep_time=2))
task_stopper = loop.create_task(stopper(loop, sleep_time=4))
loop.run_forever()

Normally when you run a loop until a task completes with `run_until_complete` the result of the future is evaluated after execution. This evaluation could also mean throwing an error.

In [None]:
task_except.result()

Whenever you run the loop forever (`run_forever()`), the result of the future isn't necessarily evaluated and thus exceptions might not be noticed.

One way to circumvent this is to set the exception handler for the loop:

In [None]:
def _exception_handler(loop_object, context):
    loop_object.stop()
    print("Exception: {0}".format(context['exception']))
    print("Entire context exception: {0}".format(context))
    
# Set the exception handler
loop.set_exception_handler(_exception_handler)

In [None]:
loop.create_task(pleaseExceptMe(sleep_time=1))
loop.run_forever()

Some very weird stuff happens if you catch the returned task...

In [None]:
task_except = loop.create_task(pleaseExceptMe(sleep_time=1))
task_stopper = loop.create_task(stopper(loop, sleep_time=5))
loop.run_forever()

The exception handler is never called, this apparantly is due to the fact that in the first case, upon garbage collecting the finished exceptMe task, the exception handler gets called (http://bugs.python.org/issue28274).

## Passing data between tasks

Use Queues

In [None]:
import asyncio

async def adder(queue):
    """ Add data to queue """
    for i in range(6):
        await queue.put(i)
        await asyncio.sleep(0.01)
        
async def reader(name, queue):
    """ Read data from the queue """
    while True:
        read = await queue.get()
        print("Read by {0}: {1}".format(name, read))

In [None]:
# Create new loop and tell asyncio to use it
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Add tasks and run until complete
queue = asyncio.Queue()
task_read1 = loop.create_task(reader("no.1", queue))
task_read2 = loop.create_task(reader("no.2", queue))
task_add = loop.create_task(adder(queue))
loop.run_until_complete(task_add)
loop.close()

## Control your tasks

We saw already that `asyncio.gather` could combine tasks and wait for them to all finish. Using `asyncio.wait` you can have a bit more control over the tasks you dispatch to the loop. For example you can issue a timeout for tasks in the loop:

In [None]:
async def sleeper(t):
    await asyncio.sleep(t)
    return "slept"

# Create new loop and tell asyncio to use it
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

task = loop.create_task(sleeper(2))
loop.run_until_complete(asyncio.wait([task], timeout=1))

In [None]:
task.result()

Or if you have multiple tasks and only want to wait for the first one to finish:

In [None]:
task1 = loop.create_task(sleeper(2))
task2 = loop.create_task(sleeper(4))
loop.run_until_complete(asyncio.wait([task1,task2], return_when=asyncio.FIRST_COMPLETED))
print('task1: ' + str(task1.result()))
print('task2: ' + str(task2.result()))

Or finally, if you want to stop as soon as an exception in one of the tasks occurs:

In [None]:
async def sleeper_except(t):
    await asyncio.sleep(t)
    raise Exception('hello')
    return "slept"

task1 = loop.create_task(sleeper_except(1))
task2 = loop.create_task(sleeper(4))
loop.run_until_complete(asyncio.wait([task1,task2], return_when=asyncio.FIRST_EXCEPTION))

In [None]:
task1.result()

In [None]:
task2.result()

# Asyncio under the hood

This post explains the concept very well: https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/.
All code below is copy pasted from there.

Essentially a coroutine (task that runs in an eventloop) is very similar to a generator. Let's first look at a simple generator:

In [None]:
def lazy_range(up_to):  
    """Generator to return the sequence of integers from 0 to up_to, exclusive."""
    index = 0
    while index < up_to:
        yield index
        index += 1
        
generator = lazy_range(2)
print(next(generator))
print(next(generator))
print(next(generator))

But you can also send data to a generator since python 2.5:

In [None]:
def jumping_range(up_to):  
    """Generator for the sequence of integers from 0 to up_to, exclusive.

    Sending a value into the generator will shift the sequence by that amount.
    """
    index = 0
    while index < up_to:
        jump = yield index
        if jump is None:
            jump = 1
        index += jump

In [None]:
generator = jumping_range(10)
print(next(generator))
generator.send(5)
print(next(generator))
print(next(generator))

`yield from` can be used to chain generators:

In [None]:
def bottom():  
    # Returning the yield lets the value that goes up the call stack to come right back
    # down.
    return (yield 42)

def middle():  
    return (yield from bottom())

def top():  
    return (yield from middle())

# Get the generator.
gen = top()  
value = next(gen)  
print(value)  # Prints '42'.  
try:  
    value = gen.send(value * 2)
except StopIteration as exc:  
    value = exc.value
print(value)  # Prints '84'. 

Put bluntly this is the concept that python uses to implement asynchronous programming. Programs yield from (await) Future objects. These future objects will at a certain point in time return a value. That only happens though, when the future is done (e.g. data is received from a database). Before that the future will yield wait times that the loop will have to wait before checking again if the future is done.

In [None]:
import datetime  
import heapq  
import types  
import time


class Task:

    """Represent how long a coroutine should wait before starting again.

    Comparison operators are implemented for use by heapq. Two-item
    tuples unfortunately don't work because when the datetime.datetime
    instances are equal, comparison falls to the coroutine and they don't
    implement comparison methods, triggering an exception.

    Think of this as being like asyncio.Task/curio.Task.
    """

    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until


class SleepingLoop:

    """An event loop focused on delaying execution of coroutines.

    Think of this as being like asyncio.BaseEventLoop/curio.Kernel.
    """

    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        # Start all the coroutines.
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))
        # Keep running until there is no more work to do.
        while self._waiting:
            now = datetime.datetime.now()
            # Get the coroutine with the soonest resumption time.
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                # We're ahead of schedule; wait until it's time to resume.
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                # It's time to resume the coroutine.
                wait_until = task.coro.send(now)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                # The coroutine is done.
                pass


@types.coroutine
def sleep(seconds):  
    """Pause a coroutine for the specified number of seconds.

    Think of this as being like asyncio.sleep()/curio.sleep().
    """
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    # Make all coroutines on the call stack pause; the need to use `yield`
    # necessitates this be generator-based and not an async-based coroutine.
    actual = yield wait_until
    # Resume the execution stack, sending back how long we actually waited.
    return actual - now


async def countdown(label, length, *, delay=0):  
    """Countdown a launch for `length` seconds, waiting `delay` seconds.

    This is what a user would typically write.
    """
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = await sleep(delay)
    print(label, 'starting after waiting', delta)
    while length:
        print(label, 'T-minus', length)
        waited = await sleep(1)
        length -= 1
    print(label, 'lift-off!')


def main():  
    """Start the event loop, counting down 3 separate launches.

    This is what a user would typically write.
    """
    loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2),
                        countdown('C', 4, delay=1))
    start = datetime.datetime.now()
    loop.run_until_complete()
    print('Total elapsed time is', datetime.datetime.now() - start)

In [None]:
main()

## Implementing your own awaitable

In [None]:
import asyncio
import types
import datetime

@types.coroutine
def sleep(seconds):  
    """ One way to create an awaitable, adding coroutine type. 
    Sleep amount of seconds """
    
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    
    # Yield until time has passed
    while datetime.datetime.now() < wait_until:
        print("waiting")
        yield
    
    return "Slept " + str(datetime.datetime.now() - now)


async def test_sleep(name):
    """ await sleep method """
    return await sleep(0.002)

    
def test(name):
    """ A simple generator can be made a coroutine by the create_task function """
    for i in range(3):
        print(str(name) + "; i: " + str(i))
        yield

    return name

In [None]:
loop = asyncio.get_event_loop()
task1 = loop.create_task(test_await("task1"))
task2 = loop.create_task(test("task2"))
loop.run_until_complete(asyncio.gather(task1,task2))

# References

- In-depth blog post on asyncio: https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/
- Async file reading: https://github.com/Tinche/aiofiles
- Using semaphores to limit open connections aiohttp: https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html
