# Exploring Async in Python with SleepSort

In our SaaS product, we often need to geocode hundreds of addresses uploaded from and Excel file by our users while they wait.

This taks a long time if done synchronously because of network round-trips. Therefore, we want it to be concurrent.

Currently we do it in threads, which is pretty straight forward using [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor). However
1. Our web framework (gunicorn) is already threading, so it's harder to keep track of total threads
2. Async has less overhead than threads
*... but really I just want to learn async better!*


So, to get familiar with async with simpler-than-geocoding usecase, I decided to look at SleepSort

## SleepSort
SleepSort is a joke sorting algorithm that became popular thanks to [a 4chan post in 2011](https://web.archive.org/web/20151231221001/http://bl0ckeduser.github.io/sleepsort/sleep_sort_trimmed.html)

It works by sleeping for an amount of time which is the value of each item in the original list, then printing it.

In [1]:
import asyncio

async def sleep_then_print(val):
    '''Gets called on every item in the list we want to sort'''
    await asyncio.sleep(val)
    print(val)  # Print only to demonstrate how SleepSort works
    return val

Pretty straightforward so far.


`sleep_then_print` is called on **each item** and:
1. Sleeps for the amount of time that **is** the value
2. Returns the value


To kick off `sleep_then_print` for each element, we're going to call `start_sleeping`:

In [2]:
async def start_sleeping(collection):
    '''Kick off each sleep_then_print task (but don't wait for it to finish)'''
    return [
        asyncio.create_task(sleep_then_print(val))
        for val in collection
    ]


Note:
1. **`create_task` starts a task**  
   Calling `asyncio.create_task` starts running a coroutine. But it doesn't wait for it to finish.  


2. **You can call your async function directly**  
   When working with threads, we would make our threads using something like
```python
Thread(target=sleep_then_print, args=(val, ))
```
We have to split the function and its arguments because we don't want to actually invoke our function yet.

   In Async, this doesn't apply. Since Python knows your function is async, calling it makes a coroutine object instead of actually invoking the function, so 
```python
asyncio.create_task(sleep_then_print(val))
```
works.


3. **Don't `await` `create_task`**  
   It's tempting to write
   ```python
await asyncio.create_task(sleep_then_print(val))
```
   but this would wait for each task to complete before starting the next task, therefore making our sleeps run consecutively instead of in parallel.  
We don't want that!


In [3]:
async def async_sleep_sort(collection):
    '''You'll pass your list to this function when you want to sort it'''
    tasks = await start_sleeping(collection)
    
    return [
        await task
        for task in asyncio.as_completed(tasks)
    ]

A few interesting things:

1. **`await task` waits for a task to finish**  
   This is the async equivalent of `thread.join()`.


2. **`asyncio.as_completed` makes it all work**  
   If we returned 
   ```python
[await task for task in tasks]
```
 we would get the elements back in the order they started in.
   
   `asyncio.as_completed` is a fundamental part of SleepSort since SleepSort relies on functions completing in sorted order


So let's check if it works!

In [4]:
collection = [3, 2, 5, 4, 1]
sorted_collection = await async_sleep_sort(collection)
sorted_collection

1
2
3
4
5


[1, 2, 3, 4, 5]

Woohooo!

Some points to note:
1. The value **1** is printed after 1 second. The value **2** is printed after 2 seconds. Etc.  
You're seeing SleepSort in action!


2. However, `async_sleep_sort` *returns* all the values only after the sort has completed. More on this later.


3. Usually to run async code, we would call  our coroutine like this:
```python
sorted_collection = asyncio.run(async_sleep_sort(collection))
```
It doesn't work here because this Jupyter notebook is [already running an asyncio event loop](https://stackoverflow.com/questions/47518874/how-do-i-run-python-asyncio-code-in-a-jupyter-notebook), so I just `await`ed the function instead.

## The SleepSort Generator


To prevent our users from having the feeling of waiting while we geocode their job, our backend *streams* geocoded addresses to the frontend **as soon as each geocode is completed.** 

This lets the frontend loads them progressively on the map, all within a single request to the backend.

To make this work in our web framework, we need to [provide a generator](https://dev.to/rhymes/comment/2inm) to [response.stream](https://falcon.readthedocs.io/en/stable/api/request_and_response_wsgi.html#falcon.Request.stream). Calling `next()` on the generator needs to `yield` the next geocoded location, as soon as it is ready.

So let's see if we can make our async SleepSort function return our sorted items one by one as soon as their *sleep* is over.

In [5]:
async def async_sleep_sort_yield(collection):
    tasks = await start_sleeping(collection)
    
    # Let's try to yield the values as we get them.
    for task in asyncio.as_completed(tasks):
        yield await task
        
sleep_sort_generator = async_sleep_sort_yield(collection)

That's how you normally make a generator, right?

In [6]:
type(sleep_sort_generator)

async_generator

Interesting. We've got an `async_generator`, not a normal generator.  
Does it work the same?

In [7]:
next(sleep_sort_generator)

TypeError: 'async_generator' object is not an iterator

Clearly not.  
What if we remove the `await`? Will that let us make a *sync* `generator` instead??

In [8]:
async def async_sleep_sort_yield(collection):
    tasks = await start_sleeping(collection)
    
    for task in asyncio.as_completed(tasks):
        # yield await task
        yield task
        
sleep_sort_generator = async_sleep_sort_yield(collection)
next(sleep_sort_generator)

TypeError: 'async_generator' object is not an iterator

Still no.  
However, we need to make a real generator so we can stream it using our backend framework.

It's kind of weird though.  
Why **isn't** an `async_generator` an iterator? Why can't I just loop over it or call `next` on it?

## Two Worlds

People often talk about the *synchronous world* and the *asynchronous world*, and how they don't really interact.


You (mostly) can't call synchronous functions in the *async world* because they *block*.  
That is, they don't give up control, and therefore don't allow any other async functions to run concurrently.


Conversely, you can't call async functions in the *synchronous world* because they need an *async event loop* running in order to work.  
- You *can* run an event loop and then run them, using `asyncio.run(my_async_function())`  
- However, running an event loop **is a synchronous function**, so it *blocks*, and means that you can't run any other synchronous functions at the same time.

For the specific case of using an `async_generator` like a normal generator, I [learnt from StackOverflow](https://stackoverflow.com/questions/42448664/async-generator-is-not-an-iterator) that:

1. **You kind-of can with `async for`**  
   Using an `async for` loop instead of a `for` loop works as expected, except that...


2. **`async_generator`s require the async event loop to be running**  
   just like all async functionality in Python.  
   This means that, if my `async_generator` is created as part of an async event loop, I can't then use it after that loop ends.

   As a result, the core devs have intentionally kept these syntactically different in order for this not to surprise people, and in accordance with Tim Peter's advice that "explicit is better than implicit".

This is very relevant to our usecase, since our web framework is not async.  
Therefore, it can't stream an http response while an event loop is running.

At least not in the same thread...


## The Threaded SleepSort Generator
I really wanted to make this work. So I wondered if I could have the async event loop running in a thread in the background, then somehow feed the values back into the synchronous world so I could then `yield` them back *as they are completed*.

Let's see what we can do.

First, let's make a context manager that will help us run our function in a thread.

In [9]:
from threading import Thread
from contextlib import contextmanager

@contextmanager
def new_thread(*args, **kwargs):
    t = Thread(*args, **kwargs)
    t.start()
    try:
        yield
    finally:
        t.join()
        

It just makes a thread and guarantees we clean up our thread after it is done.  
You can use it like:

In [10]:
with new_thread(target=print, args=('print from new thread', )):
    print('print from main thread')
    

print from new thread
print from main thread


Or a more relevant example:

In [11]:
import time

coroutine = async_sleep_sort(collection)   # doesn't call async_sleep_sort... just builds a coroutine.

with new_thread(target=asyncio.run, args=(coroutine, )):
    time.sleep(2.5)
    print('Sleep sort is running in a new thread, and could be sending values to us here in the main thread '
          'and at the same time, the main thread could be passing those values on to the API caller')

1
2
Sleep sort is running in a new thread, and could be sending values to us here in the main thread and at the same time, the main thread could be passing those values on to the API caller
3
4
5


> If you're wondering why we can use `asyncio.run` here, yet we couldn't before, it's because we're running a **new** async event loop in the new thread.

So, yeah, we're still using threads in the end for our geocoder.  
But at least we're only going to create one new thread instead of creating 20 (or however many workers we set in the `ThreadPoolExecutor`).

So now let's make our generator! 

In [12]:
def sleep_sort(collection):
    coroutine = async_sleep_sort(collection)
    with new_thread(target=asyncio.run, args=(coroutine, )):
        for _ in collection:
            yield '????'       # ummmmmmm.... what do we yield?
            
gen = sleep_sort(collection)
for val in gen:
    print(val)

????
????
????
????
????
1
2
3
4
5


Huh.

We have made a generator that can yield values while `async_sleep_sort` is running in a different thread.

But now... somehow we have to get the values back from that other thread into the main thread.  
*And* we need to get them as they are returned (not all at once at the end).

## The Threaded SleepSort Generator with Queue

> The `queue` module is ... especially useful in threaded programming when information must be exchanged safely between multiple threads.  

\- The [Python Docs on Queues](https://docs.python.org/3/library/queue.html)

Sounds good to me.

Maybe we could:
1. Add values to a queue in `async_sleep_sort` while it's running in the thread
2. Yield values back from that queue in our synchronous `sleep_sort` generator

So let's do it.

In [13]:
# 1. Add values to the queue in our `async_sleep_sort` function that's running in the thread

async def async_sleep_sort(collection, queue):
    tasks = await start_sleeping(collection)
    
    for task in asyncio.as_completed(tasks):
        val = await task
        queue.put(val)
        

In [14]:
from queue import Queue

# 2. Yield values back from that queue in our synchronous `sleep_sort` generator 

def sleep_sort(collection):
    queue = Queue()
    coroutine = async_sleep_sort(collection, queue)
    with new_thread(target=asyncio.run, args=(coroutine, )):
        for _ in collection:
            yield queue.get()
            

Let's see if it works!

In [15]:
for value in sleep_sort(collection):
    print(f'Generator yielded {value}')

1
Generator yielded 1
2
Generator yielded 2
3
Generator yielded 3
4
Generator yielded 4
5
Generator yielded 5


![It's Working!!](https://media1.tenor.com/images/128fd3a348e0f90d9acea9e57fee72bb/tenor.gif?itemid=4777331 "chess")

Nice!

Notice:
- The generator yields the values as soon as they are put in the queue, which happens as each sleep finishes. Perfect.


- We're still printing the value from within the `sleep_then_print` function, which is why we see every number come back twice.  
We don't really need this anymore, since we can access the returned values as soon as they're ready. So let's remove the `print` and rename accordingly.


If we make that change, and put all the code together, here's what we end up with!

In [16]:
import asyncio

from contextlib import contextmanager
from queue import Queue
from threading import Thread


async def sleep_then_return(val):
    '''Gets called on every item in the list we want to sort'''
    await asyncio.sleep(val)
    return val


async def start_sleeping(collection):
    '''Kick off each sleep_then_print task (but don't wait for it to finish)'''
    return [
        asyncio.create_task(sleep_then_return(val))
        for val in collection
    ]


async def async_sleep_sort(collection, queue):
    '''Run the async part of the SleepSort, which adds each item to the queue when it has finished sleeping.
    This to be designed to be run inside a thread'''
    tasks = await start_sleeping(collection)
    
    for task in asyncio.as_completed(tasks):
        val = await task
        queue.put(val)


@contextmanager
def new_thread(*args, **kwargs):
    '''A helper to conveniently make, start, and clean up a Thread'''
    t = Thread(*args, **kwargs)
    t.start()
    try:
        yield
    finally:
        t.join()
        
        
def sleep_sort(collection):
    '''The SleepSort Generator which
    1. Runs async_sleep_sort in a thread
    2. Collects the results from it using a queue
    3. Yields them as they are ready'''
    queue = Queue()
    coroutine = async_sleep_sort(collection, queue)
    with new_thread(target=asyncio.run, args=(coroutine, )):
        for _ in collection:
            yield queue.get()
            

Let's test

In [17]:
# Test sleep_sort() gives the same result as sort()
collection = [0.1, 0.4, 0.2, 0.25, 0.3]  # using smaller numbers so SleepSort works faster!
assert list(sorted(collection)) == list(sleep_sort(collection))

In [18]:
# Test sleep_sort() yields values as they are completed
import time
eps = 0.01
start = time.time()

for val in sleep_sort(collection):
    elapsed = time.time() - start
    
    assert val - eps < elapsed < val + eps
    # e.g. the value "3" should be returned after between 2.99 and 3.01 seconds
    

In [19]:
# Test it works with other collections
collection = {0.1, 0.4, 0.2, 0.25, 0.3}
assert list(sorted(collection)) == list(sleep_sort(collection))

collection = (0.1, 0.4, 0.2, 0.25, 0.3)
assert list(sorted(collection)) == list(sleep_sort(collection))

In [20]:
# Test we can sort 50 numbers in a second
import random
collection = [random.random() for _ in range(50)]
assert list(sorted(collection)) == list(sleep_sort(collection))

## Limitations

In [21]:
# Test we can sort 200 numbers in a second
collection = [random.random() for _ in range(100)]
assert list(sorted(collection)) == list(sleep_sort(collection))

AssertionError: 

So... It works when I try to sort 50 numbers between 0 and 1  
...but fails when I try to sort 100.

Why?

It's because our `sleep_then_return` coroutines don't start at exactly the same instant.  
This happens because it takes time for python code to run, and every call to every function has an overhead associated with it.


Therefore, if two values in our collection are too close together, we could have a situation where the overhead is larger than the difference in sleep time, which messes up our return sequence and our SleepSort!

BUT! I have a clever idea. What if we multiply all values by 10, then they won't be so close together!

In [22]:
collection = [10*random.random() for _ in range(100)]
assert list(sorted(collection)) == list(sleep_sort(collection))

Great, it works! (most of the time...)

But it also takes 10x longer to run...

## The Tradeoff

SleepSort has this interesting property. Its run-time is the largest value (plus a small overhead).  
- So if I want to sort 10 numbers between 0 and 1, It takes up to a second 
- But if I want to sort 10 numbers between 0 and 10, it takes up to 10 seconds

The immediately obvious conclusion is 
>"oh wow, I could sort a trillion numbers in a second, I just have to scale them so they're all between 0 and 1"

Unfortunately, it doesn't work due to that overhead.

And this is interesting in a few ways:
1. If we want to sort more numbers (or if some of them are closer together), we can make it work by scaling them up and increasing the time.
2. It's not just a question of "does it work", it's also a question of "how sorted do we want it?".

In [23]:
collection = [
    0.0005403, 0.0000248, 0.0018098, 0.0017214, 0.0006107, 
    0.0017219, 0.0001389, 0.0004980, 0.0013523, 0.0001567, 
    0.0009915, 0.0005340, 0.0000735, 0.0009510, 0.0015502, 
    0.0000313, 0.0002114, 0.0016292, 0.0006768, 0.0019651,
]
[val for val in sleep_sort(collection)]

[2.48e-05,
 3.13e-05,
 7.35e-05,
 0.0001389,
 0.0001567,
 0.0002114,
 0.0005403,
 0.000498,
 0.000534,
 0.0006107,
 0.0006768,
 0.000951,
 0.0009915,
 0.0013523,
 0.0015502,
 0.0016292,
 0.0017214,
 0.0017219,
 0.0018098,
 0.0019651]

Here's one example output (one time when I ran the above script... it's different every time)
![image.png](almost_sorted.png "image")
We can see that this list is *almost* sorted, but not quite.

If we multiplied these values by 100, it would be perfectly sorted, but would take longer.

Therefore, we have an interesting tradeoff.

### Correctness vs Performance

SleepSort gives us this kind of *lever*, which allows us to trade off correctness against performance.
- We can run it slowly to be sure that the result is correct  
   OR
- We can run it fast and get a "rough" answer quickly.

Interesting.

But it had me thinking. Why do we need to sleep the full amount of time... isn't there some shortcut way to just figure out the next sleep to finish, and return it immediately? That would be heaps better!

It turns out there is. But unfortunately it's not quite the holy grail I had imagined...

## Heaps better
>...some shortcut way to just figure out the next sleep to finish...

It turns out this *shortcut way* is just called *sorting*. Bugger.

Let's take a look anyway.

The most intuitive idea is to use something called a **Priority Queue**.

Basically, you add elements into a Queue. But instead of them coming out in the order they went in (FIFO), they come out in the order that you **prioritise** them to come out in.

With Priority Queues, **prioritising** the elements just means you assign a number to how high priority each element is.  
So with our list we want to sort, we can set the priority of each element to its value, analogously to how we set its sleep time to its value in SleepSort.

So, if we wanted to sort a collection of elements, we could:
1. Add the elements to a Priority Queue
2. Take elements from the Priority Queue in order of priority

The whole reason why this works, and is pretty fast, is because [Priority Queues are implemented using Heaps](https://course.ccs.neu.edu/cs2510h/lecture29.html), which is a type of binary tree where the *Heap Invariant* holds. It just means that the tree is constructed in a special way where every parent node has a higher value than its children.

So, as it turns out, this sort algorithm is called HeapSort.  
*And*, it has the added advantage of not needing to run concurrently.

Let's check it out

In [24]:
from queue import PriorityQueue

pqueue = PriorityQueue()

collection = [3, 2, 5, 4, 1]

for item in collection:
    pqueue.put(item)
    
output = [pqueue.get() for _ in collection]

assert output == list(sorted(collection))

Simple!

## Async Geocoding
So, after this exploration, will we switch to async geocoding?  
**Probably not.**

- It works fine as it is
- There is negligible performance upside to changing
- It's not a plug-and-play change - we would need to change our geocoding code to make async requests etc


If we were to rewrite it from scratch though, I think we would have a different answer.