<div style="overflow-x:unset"><h1>whoami<span class="blink">_</span></h1></div>

# `asyncio`: We Did It Wrong

<h3> Lynn Root <span style="color:rgb(25,230,140)">|</span> Spotify <span style="color:rgb(25,230,140)">|</span> @roguelynn</h3>

* Internal FOSS Evangelist

* Global PyLadies Leader

I'm a site reliability engineer at Spotify, which basically means I either break our entire service, or get paged to fix it when other people do.

In actuality, what an SRE does at spotify - as it varies widely at different companies - is a combination of backend development where my team and I run a few services that other engineers use daily, plus a little devops and system administration.

I'm also our FOSS evangelist: I help a lot of teams release their projects and tools under the spotify GitHub organization.

Lastly, I help lead PyLadies - a global mentorship group for women, and friends, to help increase diversity in the Python community.

Before I start, I want to warn y'all: I will use all the amount of time allotted. Therefore I will not have time for a Q&A session. _you might even think I do this purposefully to try to avoid Q&As_ But! I am available afterwards to chat, and I'll have a link at the end for the notebook & example code here.

# async all the things

<img src="images/xkcd_python.png" alt="import gravity" style="width: 400px;"/>

asyncio - The concurrent Python programmer’s dream, the answer to everyone's asynchronous prayers. 

The `asyncio` module has various layers of abstraction allowing developers as much control as they need and are comfortable with. 

```pycon
Python 3.7.0 (default, Jul  6 2018, 11:30:06)
[Clang 9.1.0 (clang-902.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():
...   print(f'[{datetime.datetime.now()}] Hello...')
...   await asyncio.sleep(1)  # some I/O-intensive work
...   print(f'[{datetime.datetime.now()}] ...World!')
...
>>> asyncio.run(hello())
[2018-07-07 10:45:55.559856] Hello...
[2018-07-07 10:45:56.568737] ...World!
```

Simple "Hello, World"-like examples show how it can be so effortless, look at that!

But it's easy to get lulled into a false sense of security. This ain't helpful.

# Fake News.

<p style="font-size:75%">We're led to believe that we're able to do a lot with the structured `async`/`await` API layer. Some tutorials, while great for the developer getting their toes wet, try to illustrate real world examples, but are actually just beefed-up "hello, world"s.</p>

<p style="font-size:75%">Some even misuse parts of `asyncio`'s interface, allowing one to easily fall into the depths of callback hell. </p>

<p style="font-size:75%">Some get you easily up and running with `asyncio`, but then you may not realize it's not correct or exactly what you want, or only gets you part of the way there. While some tutorials and walk throughs do a lot to improve upon the basic "hello, world" use case, it is still just a web crawler. I'm not sure about others, but I'm not building web crawlers at Spotify.</p>

<p style="font-size:75%">Asynchronous programming is difficult. Whether you use `asyncio`, Twisted, Tornado, or Golang, Erlang, Haskell, whatever, it's just difficult.</p>

<p style="font-size:75%">Within my team at Spotify, we (mostly me :-!) fell into this false sense of ease that the `asyncio` community builds. The past couple of services we built, we felt they were perfect use cases for `asyncio`: a chaos-monkey like service for restarting instances, and an event-driven hostname generation service for DNS.</p>

<p style="font-size:75%">Sure, we needed to make a lot of HTTP requests that should be non-blocking. But these services also had to react to events from a pubsub, measure the progress of actions initiated from those events, handle any incomplete actions or other external errors, deal with pubsub message lease management, measure service level indicators, and send metrics. And needed to use non-`asyncio`-friendly dependencies. This quickly got difficult.</p>


# DIY Chaos Monkey

<h2>Mayhem Mandrill</h2>
<img src="images/mandrill_yell.jpg" alt="yelling mandrill" style="width: 600px;"/>

Allow me to provide you a real-world example that actually comes from the real world. Recently at Spotify, we built a service that does periodic hard restarts our entire fleet of instances. 

And we’re going to do that here. Let’s build a service called Mayhem Mandrill which will listen for a pub/sub message and restart a host based off of that message. As we build this service, I’ll point out the traps that I may or may not have fallen into. This will essentially become the type of resource that past Lynn would have wanted a year or two ago. 

# Initial Setup

At Spotify we use Google Cloud Pub/Sub. However, there are a lot of other choices out there. 

For our purposes, we'll simulate a pub/sub technology with `asyncio`…

### Simulate an external publisher of messages
*Adapted from [asyncio.readthedocs.io](http://asyncio.readthedocs.io/en/latest/producer_consumer.html)*

Where this is pretty much inspired by a tutorial from asyncio’s docs

In [None]:
async def publish(queue, n):
    for x in range(1, n + 1):
        instance_name = f'cattle-{x}'
        msg = Message(msg_id=x, inst_name=instance_name)
            
        await queue.put(msg)  # "publish" a message
        logging.info(f'Published {x} of {n} messages')

    await queue.put(None)  # publisher is done

In [None]:
async def consume(queue):
    while True:
        # wait to "consume" a message
        msg = await queue.get()
        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        # unhelpful simulation of i/o work
        await asyncio.sleep(random.random())

In [None]:
queue = asyncio.Queue()
asyncio.run(publish(queue, 5))
asyncio.run(consume(queue))

Using Python 3.7's latest syntactic sugar! When we run this, we see the following:

```sh
$ python mandrill/mayhem.py
```
```
14:36:21,802 INFO: Published 1 of 5 messages
14:36:21,802 INFO: Published 2 of 5 messages
14:36:21,802 INFO: Published 3 of 5 messages
14:36:21,802 INFO: Published 4 of 5 messages
14:36:21,803 INFO: Published 5 of 5 messages
14:36:21,804 INFO: Consumed Message(inst_name='cattle-jg4t')
14:36:22,780 INFO: Consumed Message(inst_name='cattle-hz84')
14:36:23,558 INFO: Consumed Message(inst_name='cattle-kd7q')
14:36:23,938 INFO: Consumed Message(inst_name='cattle-z0ww')
14:36:24,815 INFO: Consumed Message(inst_name='cattle-3hka')
```

So let's work off of this. We'll first start with boilerplate-like code to start and stop this pub/sub simulator.

# Running an `asyncio`-based Service

<h1><span style="font-size:50%;font-style:italic;color:gray;">Running an <code>asyncio</code>-based service</span><br/><code>loop.run_until_complete</code> versus <code>asyncio.run</code></h1>'

While pretty simple, not many examples and tutorials dive into starting an `asyncio` service. The usual entry point to starting a service on the event loop is either [`loop.run_forever`][18] or [`loop.run_until_complete`][19] (or the newest addition, `asyncio.run`) with both sometimes being used.

Reworking on the example above, we'll use `loop.run_until_complete` (versus `loop.run_forever`) since there are a finite number of messages to produce and to consume. Before, we used `asyncio.run` – [provisionally available as of 3.7.0](https://docs.python.org/3/whatsnew/3.7.html#asyncio) – which handles the event loop management for us and is [designed to be an entry point](https://docs.python.org/3/library/asyncio-task.html#asyncio.run) for `asyncio` programs. However it's not ideal for our usage as it's wasteful to call it more than once. [It creates and closes an event loop each time its called](https://github.com/python/cpython/blob/416c1ebd9896b394790dcb4f9f035b1a44ebe9ff/Lib/asyncio/runners.py#L8-L50), and it may not be ideal for long-running services as it uses `run_until_complete` under the hood.

[18]: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.run_forever
[19]: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.run_until_complete

In [None]:
if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(produce(queue, 5))
    loop.run_until_complete(consume(queue))

Let's rework the `__main__` part just a bit to reflect creating our own loop and running what we need ourselves. 

You'll notice the comment pointing out bad code. This would be fine for playing around, for scripting and such. But we'll see in a bit that we should not do this while running in production

In [None]:
if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(produce(queue, 5))
    loop.run_until_complete(consume(queue))
    loop.close()

Since we created and started the event loop, we should clean it up too

<h1><span style="font-size:50%;font-style:italic;color:gray;">Running an <code>asyncio</code>-based service</span><br/><code>loop.run_forever</code></h1>

So far, we haven't built an actual service; it's merely just a pipeline or a batch job right now. In order to continuously run, we need to use `loop.run_forever`. For this, we have to schedule and create tasks out of the coroutines, then start the loop:

In [None]:
if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.create_task(publish(queue, 5))
    loop.create_task(consume(queue))
    loop.run_forever()
    loop.close()
    logging.info('Done')

Since we created and started the event loop, we should clean it up too. 

So then running with this updated code...

```sh
$ python mandrill/mayhem_3.py
```
```
19:45:17,540 INFO: Published 1 of 5 messages
19:45:17,540 INFO: Published 2 of 5 messages
19:45:17,541 INFO: Published 3 of 5 messages
19:45:17,541 INFO: Published 4 of 5 messages
19:45:17,541 INFO: Published 5 of 5 messages
19:45:17,541 INFO: Consumed Message(inst_name='cattle-ms1t')
19:45:17,749 INFO: Consumed Message(inst_name='cattle-p6l9')
19:45:17,958 INFO: Consumed Message(inst_name='cattle-kd7q')
19:45:18,238 INFO: Consumed Message(inst_name='cattle-z0ww')
19:45:18,415 INFO: Consumed Message(inst_name='cattle-3hka')
^CTraceback (most recent call last):
  File "mandrill/mayhem_3.py", line 68, in <module>
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
```

...we see that all messages are published and then consumed, and then we hang, because there is no more work to be done (we only published 5 messages, after all). To stop the process, we have to interrupt it (via `^C` or sending a signal like `kill -9 <pid>`).

So yeah, That's nice and ugly... You may notice that we'll also never get to the `loop.close()` line either. Nor are we handling any exceptions that may raise from awaiting the `publish` and `consume` coroutines.

<h1><span style="font-size:50%;font-style:italic;color:gray;">Running an <code>asyncio</code>-based service</span><br/>Running the event loop defensively</h1>

We'll first address the catching of exceptions that arise from coroutines. Let's fake an error in the `consume` coroutine:

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        # super-realistic simulation of an exception
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        # unhelpfup simulation of an i/o operation
        await asyncio.sleep(random.random())

If we run it as is,

```sh
$ python mandrill/mayhem_3.py
```
<pre><code>
17:39:52,933 INFO: Published 1 of 5 messages
17:39:52,933 INFO: Published 2 of 5 messages
17:39:52,933 INFO: Published 3 of 5 messages
17:39:52,933 INFO: Published 4 of 5 messages
17:39:52,933 INFO: Published 5 of 5 messages
17:39:52,933 INFO: Consumed Message(inst_name='cattle-cu7f')
17:39:53,876 INFO: Consumed Message(inst_name='cattle-xihm')
17:39:54,599 INFO: Consumed Message(inst_name='cattle-clnn')
<span class="asyncio-hl">17:39:55,051 ERROR: Task exception was never retrieved</span>
future: <Task finished coro=<consume() done, defined at mandrill/mayhem_3.py:45> exception=Exception('an exception happened!')>
Traceback (most recent call last):
  File "mandrill/mayhem_3.py", line 52, in consume
    raise Exception('an exception happened!')
Exception: an exception happened!
^CTraceback (most recent call last):
  File "mandrill/mayhem_3.py", line 72, in <module>
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
</code></pre>

We get an error saying "exception was never retrieved." This is admittedly a part of the `asyncio` API that's not that friendly. If this was synchronous code, we'd simply see the error that we raised. But it get's swallowed up into this unretrieved task. 

So to deal with this, as [advised in the asyncio documentation](https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed), we'll need to have a wrapper coroutine to consume the exception and stop the loop.

In [46]:
async def handle_exception(coro, loop):
    try:
        await coro
    except Exception as e:
        logging.error(f'Caught exception: {e}')
        loop.stop()  # may not need/want to do this

In [None]:
if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.create_task(handle_exception(publish(queue, 5), loop))
    loop.create_task(handle_exception(consume(queue), loop))
    try:
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.close()

So we make a little top-level wrapper to run and handle exceptions from coroutines. So now when we run our script, we get something a little cleaner:

```sh
$ python mandrill/mayhem_4.py
```
<pre><code>
17:46:01,208 INFO: Published 1 of 5 messages
17:46:01,208 INFO: Published 2 of 5 messages
17:46:01,208 INFO: Published 3 of 5 messages
17:46:01,208 INFO: Published 4 of 5 messages
17:46:01,209 INFO: Published 5 of 5 messages
17:46:01,209 INFO: Consumed Message(inst_name='cattle-hotv')
17:46:01,824 INFO: Consumed Message(inst_name='cattle-un2v')
17:46:02,139 INFO: Consumed Message(inst_name='cattle-0qe3')
<span class="asyncio-hl">17:46:02,671 ERROR: Caught exception: an exception happened!</span>
17:46:02,672 INFO: Cleaning up
</code></pre>

Ah that's a bit more clear.

In [44]:
async def handle_exception(coro, loop):
    try:
        await coro
    except Exception as e:
        logging.error(f'Caught exception: {e}', exc_info=e)
        loop.stop()

If you'd like to see more information about that exception, we can update the log line to also return the traceback.

```sh
python mandrill/mayhem_4.py
```
```
13:57:33,934 INFO: Published 1 of 5 messages
13:57:33,934 INFO: Published 2 of 5 messages
13:57:33,934 INFO: Published 3 of 5 messages
13:57:33,934 INFO: Published 4 of 5 messages
13:57:33,934 INFO: Published 5 of 5 messages
13:57:33,934 INFO: Consumed Message(inst_name='cattle-7mkj')
13:57:34,311 INFO: Consumed Message(inst_name='cattle-rx3o')
13:57:35,117 INFO: Consumed Message(inst_name='cattle-t6ut')
13:57:35,951 ERROR: Caught exception: an exception happened!
Traceback (most recent call last):
  File "mandrill/mayhem_4.py", line 67, in handle_exception
    await fn()
  File "mandrill/mayhem_4.py", line 53, in consume
    raise Exception('an exception happened!')
Exception: an exception happened!
13:57:35,952 INFO: Cleaning up
```

<h1><span style="font-size:50%;font-style:italic;color:gray;">TL;DR: Running an <code>asyncio</code>-based service</span></h1>

* Don't accidentally swallow exceptions; be sure to "retrieve" them

* Clean up after yourself – `loop.close()`

So far, for setting up an asyncio service, you want to be sure you surface the exceptions from your coroutines, and to clean up what you've created. We'll expand on that clean up bit later on. This is clean enough for now.

<h1>We're still blocking</h1>
<img src="images/surprised_mandrill_3.jpg" alt="yelling mandrill"/>

I've seen quite a tutorials that make use of `async` and `await` in a way that, while does not block the event loop, is still iterating through tasks serially, effectively not actually adding any concurrency. 

Taking a look at where our script is now:

In [None]:
async def publish(queue, n):
    for x in range(1, n + 1):
        instance_name = f'cattle-{x}'
        msg = Message(msg_id=x, inst_name=instance_name)
                
        await queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    await queue.put(None)  # publisher is done

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        await asyncio.sleep(random.random())  # some i/o work

As this was adapted from [the asyncio tutorial](http://asyncio.readthedocs.io/en/latest/producer_consumer.html), we are still serially processing each item we produce, and then consume. The event loop itself isn't blocked; if we had other tasks/coroutines going on, they of course wouldn't be blocked. 

This might seem obvious to some, but it definitely isn't to all. We **are** blocking ourselves; first we produce all the messages, one by one. Then we consume them, one by one. The loops we have (`for x in range(1, n+1)` in `publish()`, and `while True` in `consume()`) block ourselves from moving onto the next message while we await to do something. 

While this is technically a working example of a pub/sub-like queue with `asyncio`, it's not what we want. Whether we are building an event-driven service (like this walk through), or a pipeline/batch job, we're not taking advantage of the concurrency that `asyncio` can provide.

## Aside: Compare to synchronous code

I find `asyncio`'s API to be quite user-friendly (although some [disagree](https://veriny.tf/asyncio-a-dumpster-fire-of-bad-design/) with valid reasons). It's very easy to get up and running with the event loop. When first picking up concurrency, this `async` and `await` syntax makes it a low hurdle to start using since it makes it very similar to writing synchronous code. 

But again, when first picking up concurrency, this API is deceptive and misleading. Yes, we are using the event loop and `asyncio` primatives. Yes it does work. Yes it seems faster – but that's probably because you just came from 2.7 (welcome to 2014, by the way).

In [None]:
import queue

if __name__ == '__main__':
    queue = queue.Queue()
    publish(queue, 5)
    consume(queue)

To illustrate how it's no different than synchronous code, here's the same script with all `asyncio`-related primatives removed; we'll just look at `consume` since `publish` is pretty much the same:

In [None]:
def publish(queue, n):
    for x in range(1, n + 1):
        instance_name = f'cattle-{x}'
        msg = Message(msg_id=x, inst_name=instance_name)
    
        queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    queue.put(None)  # publisher is done

In [None]:
async def publish(queue, n):
    for x in range(1, n + 1):
        instance_name = f'cattle-{x}'
        msg = Message(msg_id=x, inst_name=instance_name)
                
        await queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    await queue.put(None)  # publisher is done

```py3
def consume(queue):
    while True:
        msg = queue.get()
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        time.sleep(random.random())  # some blocking i/o work
```
```py3
async def consume(queue):
    while True:
        msg = await queue.get()
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        await asyncio.sleep(random.random())  # some i/o work
```

And running it shows there's not a difference (only in the "randomness" of `random.random`) compared to the `asyncio`-enabled approach:

```sh
$ python mandrill/mayhem_5.py
```
```
17:56:46,947 INFO: Published 1 of 5 messages
17:56:46,947 INFO: Published 2 of 5 messages
17:56:46,947 INFO: Published 3 of 5 messages
17:56:46,947 INFO: Published 4 of 5 messages
17:56:46,947 INFO: Published 5 of 5 messages
17:56:46,947 INFO: Consumed Message(inst_name='cattle-q10b')
17:56:47,318 INFO: Consumed Message(inst_name='cattle-n7eg')
17:56:48,204 INFO: Consumed Message(inst_name='cattle-mrij')
17:56:48,899 INFO: Consumed Message(inst_name='cattle-se82')
17:56:49,726 INFO: Consumed Message(inst_name='cattle-rkst')
```

Part of the problem could be that documentation and tutorial writers are presuming knowledge and the ability to extrapolate over-simplified examples. But it's mainly because concurrency is just a difficult paradigm to grasp in general. We write code as we read anything: left-to-right, top-to-bottom. Most of us are just not use to multitasking and context switching that our modern computers allow us. Hell, even if we are familiar with concurrent programming, understanding a concurrent system is hard.

<h1>Actually being concurrent</h1>
<img src="images/mandrill_babies.jpeg" alt="yelling mandrill" />

But we're not in over our heads yet. We can still make this simulated chaos monkey service *actually* concurrent in a rather simple way.

To reiterate our goal here: we want to build an event-driven service that consumes from a pub/sub, and processes messages as they come in. We could get thousands of messages in seconds, so as we get a message, we shouldn't block the handling of the next message we receive.

To help facilitate this, we'll also need to build a service that actually runs forever. We're not going to have a pre-set number of messages; we need to react whenever we're told to restart an instance. The triggering event to publish a restart request message could be an on-demand request from a service owner, or a scheduled gradually rolling restart of the fleet.

<h1><span style="font-size:50%;font-style:italic;color:gray;">Actually being concurrent</span><br/>Concurrent publisher</h1>

Let's first create a mock publisher that will always be publishing restart request messages, and therefore never indicate that it's done. 

In [None]:
async def publish(queue):
    choices = string.ascii_lowercase + string.digits
    while True:
        host_id = ''.join(random.choices(choices, k=4))
        msg = Message(
            msg_id=str(uuid.uuid4()), 
            inst_name=f'cattle-{host_id}')

        await queue.put(msg)
        logging.info(f'Published {msg}')

        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())

This also means we're not providing a set number of messages to publish, so we have to rework that a bit, too.

Here I'm just adding the creation of a unique ID for each message produced.

And we try running it:

```sh
$ python mandrill/mayhem_6.py
```
<pre><code>
18:08:02,995 INFO: Published Message(inst_name='cattle-w8kz')
18:08:03,988 INFO: Published Message(inst_name='cattle-fr4o')
18:08:04,587 INFO: Published Message(inst_name='cattle-vlyg')
18:08:05,270 INFO: Published Message(inst_name='cattle-v6zu')
18:08:05,558 INFO: Published Message(inst_name='cattle-mws2')
^C18:08:05,903 INFO: Cleaning up
Traceback (most recent call last):
  File "mandrill/mayhem_6.py", line 60, in <module>
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
<span style="asyncio-hl">KeyboardInterrupt</span>
</code></pre>

We're happily creating and publishing messages, but you'll notice that `KeyboardInterrupt` – trigged by the `^C` – is not actually caught.

In [None]:
if __name__ == '__main__':
    # <--snip-->
    try:
        loop.create_task(publish(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

Let's quickly clean up that traceback from the `KeyboardInterrupt`; it's a quick bandaid, as further explained later on.

So now we see:

```sh
$ python mandrill/mayhem_6.py
```
```
18:09:48,337 INFO: Published Message(inst_name='cattle-s8x2')
18:09:48,643 INFO: Published Message(inst_name='cattle-4aat')
^C18:09:49,83 INFO: Interrupted
18:09:49,83 INFO: Cleaning up
```

Fantastic! Much cleaner.

So, it's probably hard to see how this is concurrent right now. Let's add multiple producers to help see this fact.

In [None]:
async def publish(queue, publisher_id):
    choices = string.ascii_lowercase + string.digits
    while True:
        host_id = ''.join(random.choices(choices, k=4))
        msg = Message(
            msg_id=str(uuid.uuid4()), 
            inst_name=f'cattle-{host_id}')

        await queue.put(msg)
        logging.info(f'[{publisher_id}] Published {msg}')

        await asyncio.sleep(random.random())

Let's add unique publisher IDs so it's a bit easier to see the concurrency with multiple publishers.

In [None]:
if __name__ == '__main__':
    # <--snip-->
    # not that readable - sorry!
    coros = [
        handle_exception(publish(queue, i), loop) for i in range(1, 4)
    ]

    try:
        [loop.create_task(coro) for coro in coros]
        loop.run_forever()

    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

We'll create three publishers real quick, and when running:

```sh
$ python mandrill/mayhem_7.py
```
<pre><code>
18:15:38,838 INFO: <span style="background:#f2dfd7">[1]</span> Published Message(inst_name='cattle-tnh8')
18:15:38,838 INFO: <span style="background:#8ac3ec">[2]</span> Published Message(inst_name='cattle-wyt2')
18:15:38,838 INFO: <span style="background:#FFFC79">[3]</span> Published Message(inst_name='cattle-kh0l')
18:15:39,119 INFO: <span style="background:#f2dfd7">[1]</span> Published Message(inst_name='cattle-5u61')
18:15:39,615 INFO: <span style="background:#FFFC79">[3]</span> Published Message(inst_name='cattle-mbvw')
18:15:39,689 INFO: <span style="background:#f2dfd7">[1]</span> Published Message(inst_name='cattle-80ro')
18:15:39,774 INFO: <span style="background:#8ac3ec">[2]</span> Published Message(inst_name='cattle-xlm4')
18:15:39,865 INFO: <span style="background:#f2dfd7">[1]</span> Published Message(inst_name='cattle-hlwx')
18:15:39,872 INFO: <span style="background:#8ac3ec">[2]</span> Published Message(inst_name='cattle-7l1v')
18:15:40,273 INFO: <span style="background:#FFFC79">[3]</span> Published Message(inst_name='cattle-gf6k')
18:15:40,294 INFO: <span style="background:#f2dfd7">[1]</span> Published Message(inst_name='cattle-iq3r')
^C18:15:40,637 INFO: Interrupted
18:15:40,637 INFO: Cleaning up
</code></pre>

Huzzah! 

For the rest of the walk through, I'll remove the multiple publishers; this was just to easily convey that it's now concurrent, not just non-blocking.

<h1><span style="font-size:50%;color:gray;font-style:italic;">Actually being concurrent</span><br/>Concurrent consumer</h1>

Now time to add concurrency to the consumer bit. For this, the goal is to constantly consume messages from the queue and create non-blocking work based off of a newly-consumed message; in this case, to restart an instance.

The tricky part is the consumer needs to be written in a way that the consumption of a new message from the queue is separate from when the consumption happens. In other words, we have to simulate being "event-driven" by regularly pulling for a message in the queue since there's no way to trigger work based off of a new message available in the queue (a.k.a. push-based). Remember that the producer coroutine function is merely meant to simulate an external pub/sub like Google Cloud Pub/Sub (not promoting, just most familiar).

In [None]:
async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Restarted {msg.hostname}')

Let's first mock the restart work that needs to be done on any consumed message

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        await restart_host(msg)

We'll stick with our `while True` loop and await for the next message on the queue, then pass it off to `restart_host`

In [None]:
if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

Then adding it to our loop

```sh
$ python mandrill/mayhem_8.py
```
```
18:22:07,286 INFO: Pulled Message(inst_name='cattle-1dnw')
18:22:07,468 INFO: Pulled Message(inst_name='cattle-05iq')
18:22:07,715 INFO: Pulled Message(inst_name='cattle-auau')
18:22:07,863 INFO: Restarted cattle-05iq.example.net
18:22:07,987 INFO: Restarted cattle-1dnw.example.net
18:22:08,072 INFO: Pulled Message(inst_name='cattle-fbga')
18:22:08,119 INFO: Restarted cattle-auau.example.net
18:22:08,309 INFO: Restarted cattle-fbga.example.net
18:22:08,545 INFO: Pulled Message(inst_name='cattle-q9pl')
18:22:08,576 INFO: Pulled Message(inst_name='cattle-i4qb')
^C18:22:08,630 INFO: Interrupted
18:22:08,630 INFO: Cleaning up
```

Nice. We're now pulling for messages whenever they're available.

<h1><span style="font-size:50%;color:gray;font-style:italic;">Actually being concurrent</span><br/>Concurrent work</h1>

We may want to do more than one thing per message.

In [None]:
async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Restarted {msg.hostname}')
    
async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Saved {msg} into database')

For example, we'd like to store the message in a database for potentially replaying later as well as initiate a restart of the given host:

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        # potentially not what you want
        await save(msg)
        await restart_host(msg)

Within the `consume` coroutine function, we _could_ just `await` on both coroutines serially

```sh
$ python mandrill/mayhem_9.py
```
<pre><code>
<span class="asyncio-hl">18:24:50,840 INFO: Pulled Message(inst_name='cattle-nbmv')</span>
<span class="asyncio-hl2">18:24:50,944 INFO: Pulled Message(inst_name='cattle-7npf')</span>
18:24:51,534 INFO: Pulled Message(inst_name='cattle-v8cl')
<span class="asyncio-hl">18:24:51,647 INFO: Saved Message(inst_name='cattle-nbmv') into database</span>
<span class="asyncio-hl2">18:24:51,671 INFO: Saved Message(inst_name='cattle-7npf') into database</span>
<span class="asyncio-hl">18:24:51,695 INFO: Restarted cattle-7npf.example.net</span>
<span class="asyncio-hl">18:24:51,789 INFO: Restarted cattle-nbmv.example.net</span>
18:24:51,909 INFO: Pulled Message(inst_name='cattle-788c')
18:24:52,361 INFO: Saved Message(inst_name='cattle-v8cl') into database
18:24:52,431 INFO: Saved Message(inst_name='cattle-788c') into database
18:24:52,784 INFO: Pulled Message(insr_name='cattle-275p')
18:24:52,842 INFO: Restarted cattle-788c.example.net
18:24:53,103 INFO: Restarted cattle-v8cl.example.net
18:24:53,534 INFO: Saved Message(inst_name='cattle-275p') into database
^C18:24:53,613 INFO: Interrupted
18:24:53,613 INFO: Cleaning up
</code></pre>

We can see that although it doesn't block the consumption of messages, `await save(msg)` blocks `await restart_host(msg)`. But, perhaps we don't _need_ to await these two coroutines one right after another. These two tasks don't necessarily need to depend on one another – completely side-stepping the potential concern/complexity of "should we restart a host if we fail to add the message to the database".

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        asyncio.create_task(save(msg))
        asyncio.create_task(restart_host(msg))

So let's treat them as such. Instead of awaiting them, we can call create_task to have them scheduled on the loop, basically chucking it over to the loop for it to execute when it next can. 

```sh
$ python mandrill/mayhem_10.py
```
```
18:49:22,114 INFO: Pulled Message(inst_name='cattle-7tsz')
18:49:22,219 INFO: Pulled Message(inst_name='cattle-1kgp')
18:49:22,272 INFO: Saved Message(inst_name='cattle-7tsz') into database
18:49:22,512 INFO: Restarted cattle-1kgp.example.net
18:49:22,640 INFO: Restarted cattle-7tsz.example.net
18:49:22,716 INFO: Saved Message(inst_name='cattle-1kgp') into database
18:49:22,998 INFO: Pulled Message(inst_name='cattle-1wdy')
18:49:23,043 INFO: Saved Message(inst_name='cattle-1wdy') into database
18:49:23,279 INFO: Pulled Message(inst_name='cattle-e9rl')
18:49:23,370 INFO: Restarted cattle-1wdy.example.net
18:49:23,479 INFO: Pulled Message(inst_name='cattle-crnh')
18:49:23,612 INFO: Saved Message(inst_name='cattle-crnh') into database
18:49:24,155 INFO: Restarted cattle-e9rl.example.net
18:49:24,173 INFO: Saved Message(inst_name='cattle-e9rl') into database
18:49:24,259 INFO: Pulled Message(inst_name='cattle-hbbd')
18:49:24,279 INFO: Restarted cattle-crnh.example.net
18:49:24,292 INFO: Pulled Message(inst_name='cattle-8mg0')
18:49:24,324 INFO: Saved Message(inst_name='cattle-hbbd') into database
18:49:24,550 INFO: Saved Message(inst_name='cattle-8mg0') into database
18:49:24,716 INFO: Pulled Message(inst_name='cattle-hyv1')
18:49:24,817 INFO: Saved Message(inst_name='cattle-hyv1') into database
^C18:49:25,17 INFO: Interrupted
18:49:25,18 INFO: Cleaning up
```

Yay!

## Aside: When you want serial work

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        # potentially what you want
        last_restart = await last_restart_date(msg)
        if today - last_restart > max_days:
            await restart_host(msg)

As an aside, sometimes you want to your work to happen serially. 

Maybe you restart hosts that have an uptime of more than 7 days. Or maybe you should check the balance of an account before you debit it. Needing code to be serial, to have steps or dependencies, it doesn't mean that you can't be asynchronous. The `await last_restart_date` will yield to the loop, but it doesn't mean that `restart_host` will be the next thing that the loop executes. It just allows other things to happen outside of this coroutine. 

<img src="images/mandrill_cover_eyes.jpg" alt="mandrill covering eyes" />

Yes I admit, this was a thing that wasn't immediately apparent to me at first.

<h1><span style="font-size:50%;color:gray;font-style:italic;">Actually being concurrent</span><br/>Message cleanup</h1>

We've pulled a message from the queue, and fanned out work based off of that message.

In [None]:
def cleanup(msg, fut):
    logging.info(f'Done. Acked {msg}')

In [None]:
async def handle_message(msg):
    g_future = asyncio.gather(save(msg), restart_host(msg))

    callback = functools.partial(cleanup, msg)
    g_future.add_done_callback(callback)
    await g_future

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')
        asyncio.create_task(handle_message(msg))

Now we need to perform any finalizing work on that message; for example, acknowledging the message so it isn't re-delivered. 

We'll separate out the pulling of the message from the creating work off of it. Then we can make use of `asyncio.gather` to add a callback:

```sh
$ python mandrill/mayhem_11.py
```
<pre><code>
19:00:27,747 INFO: Pulled Message(inst_name='cattle-xuf1')
19:00:27,848 INFO: Pulled Message(inst_name='cattle-kk87')
19:00:27,861 INFO: Restarted cattle-xuf1.example.net
19:00:28,061 INFO: Saved Message(inst_name='cattle-kk87') into database
19:00:28,244 INFO: Restarted cattle-kk87.example.net
<span class="asyncio-hl">19:00:28,245 INFO: Done. Acked Message(inst_name='cattle-kk87')</span>
19:00:28,572 INFO: Pulled Message(inst_name='cattle-pdej')
19:00:28,659 INFO: Saved Message(inst_name='cattle-xuf1') into database
<span class="asyncio-hl">19:00:28,659 INFO: Done. Acked Message(inst_name='cattle-xuf1')</span>
19:00:28,831 INFO: Saved Message(inst_name='cattle-pdej') into database
19:00:29,333 INFO: Pulled Message(inst_name='cattle-x9kz')
19:00:29,339 INFO: Pulled Message(inst_name='cattle-sicp')
19:00:29,455 INFO: Restarted cattle-pdej.example.net
<span class="asyncio-hl">19:00:29,455 INFO: Done. Acked Message(inst_name='cattle-pdej')</span>
19:00:29,506 INFO: Saved Message(inst_name='cattle-sicp') into database
19:00:29,617 INFO: Restarted cattle-sicp.example.net
<span class="asyncio-hl">19:00:29,617 INFO: Done. Acked Message(inst_name='cattle-sicp')</span>
19:00:29,795 INFO: Restarted cattle-x9kz.example.net
19:00:29,914 INFO: Saved Message(inst_name='cattle-x9kz') into database
<span class="asyncio-hl">19:00:29,914 INFO: Done. Acked Message(inst_name='cattle-x9kz')</span>
19:00:30,195 INFO: Pulled Message(inst_name='cattle-o501')
^C19:00:30,305 INFO: Interrupted
19:00:30,305 INFO: Cleaning up
</code></pre>

So once's both save coroutine and restart coroutine are complete, `cleanup` will be called that signifies a message is completely done:

In [None]:
# let's try this again
async def cleanup(msg):
    logging.info(f'Done. Acked {msg}')
    # unhelpful simulation of i/o work
    await asyncio.sleep(0)

In [None]:
async def handle_message(msg):
    await asyncio.gather(save(msg), restart_host(msg))
    await cleanup(msg)

I personally have an allergy to callbacks. As well, perhaps we need `cleanup` to be non-blocking. 

Another approach could be just to `await` it

<h1><span style="font-size:50%;color:gray;font-style:italic;">Actually being concurrent</span><br/>Task to monitor other tasks</h1>

Now, much like Google's Pub/Sub, let's say that the publisher will redeliver a message after 10 seconds if it has not been acknowledged. We _are_ able to extend that "timeout" period or acknowledgement deadline for a message. In order to do that, we now have to have a coroutine that, in essence, monitors all the other worker tasks. While they're still continuing to work, this coroutine will extend the message acknowledgement deadline; then once they're done, it should stop extending and cleanup the message.

In [None]:
async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline 3s {msg}')
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)
    else:
        await cleanup(msg)

In [None]:
async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

One approach is to make use of [`asyncio.Event`](https://docs.python.org/3/library/asyncio-sync.html#event) primatives.

```sh
$ python mandrill/mayhem_12.py
```
<pre><code>
19:04:29,602 INFO: Pulled Message(inst_name='cattle-g7hy')
<span class="asyncio-hl">19:04:29,603 INFO: Extended deadline 3s Message(inst_name='cattle-g7hy')</span>
19:04:29,692 INFO: Saved Message(inst_name='cattle-g7hy') into database
19:04:30,439 INFO: Pulled Message(inst_name='cattle-wv21')
<span class="asyncio-hl">19:04:30,440 INFO: Extended deadline 3s Message(inst_name='cattle-wv21')</span>
19:04:30,605 INFO: Restarted cattle-g7hy.example.net
19:04:31,100 INFO: Saved Message(inst_name='cattle-wv21') into database
19:04:31,203 INFO: Pulled Message(inst_name='cattle-40w2')
<span class="asyncio-hl">19:04:31,203 INFO: Extended deadline 3s Message(inst_name='cattle-40w2')</span>
19:04:31,350 INFO: Pulled Message(inst_name='cattle-ouqk')
<span class="asyncio-hl">19:04:31,350 INFO: Extended deadline 3s Message(inst_name='cattle-ouqk')</span>
19:04:31,445 INFO: Saved Message(inst_name='cattle-40w2') into database
19:04:31,775 INFO: Done. Acked Message(inst_name='cattle-g7hy')
19:04:31,919 INFO: Saved Message(inst_name='cattle-ouqk') into database
19:04:32,184 INFO: Pulled Message(inst_name='cattle-oqxz')
<span class="asyncio-hl">19:04:32,184 INFO: Extended deadline 3s Message(inst_name='cattle-oqxz')</span>
19:04:32,207 INFO: Restarted cattle-40w2.example.net
19:04:32,356 INFO: Restarted cattle-ouqk.example.net
<span class="asyncio-hl">19:04:32,441 INFO: Extended deadline 3s Message(inst_name='cattle-wv21')</span>
19:04:32,441 INFO: Restarted cattle-wv21.example.net
19:04:32,559 INFO: Saved Message(inst_name='cattle-oqxz') into database
19:04:32,661 INFO: Done. Acked Message(inst_name='cattle-40w2')
^C19:04:32,812 INFO: Interrupted
19:04:32,813 INFO: Cleaning up
</code></pre>

Running this, we can see we're extending while work continues, and cleaning up once done:

In [None]:
# another approach
async def cleanup(msg, event):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    logging.info(f'Done. Acked {msg}')

If you _love_ events, you could even make use of `event.wait`

and move the `cleanup` from extend to within handle message

In [None]:
async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline 3s {msg}')
        await asyncio.sleep(2)

In [None]:
async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

Well, alright then! We got some concurrency! Let's continue with the complexity.

<h1><span style="font-size:50%;font-style:italic;color:gray;">TL;DR: Actually being concurrent</span></h1>

* Asynchronous != concurrency

* Serial != blocking

asyncio is pretty easy to use, but being easy to use doesn't automatically mean you're using it correctly. You can't just throw around `async` and `await` keywords around blocking code. It's a shift in a mental paradigm. Both with needing to think of what work can be farmed out and let it do its thing, what dependencies there are and where your code might still need to be sequential.

But having steps within your code, having "first A, then B, then C" may seem like it's blocking when it's not. Sequential code can still be asynchronous. I might have to call customer service for something, and wait to be taken off hold to talk to them, but while I wait, I can put the phone on speaker and pet my super needy cat. I might be single-threaded as a person, but I can multi-task like CPUs.

# Graceful Shutdown
<img src="images/mandrill_sleep.jpg" style="height:400px">

Earlier, we added a try/except/finally around our main event loop code. Often though, you'll want your service to gracefully shutdown if it receives a signal of some sort, e.g. clean up open database connections, stop consuming messages, finish responding to current requests while not accepting new requests, etc.  So, if we happen to restart an instance of our _own_ service, we should clean up the "mess" we've made before exiting out.

In [None]:
if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

We've been catching the commonly-known `KeyboardInterrupt` exception like many other tutorials and libraries. 

<h1><span style="font-size:50%;font-style:italic;color:gray;">Graceful Shutdown:</span><br/>Common Signals</h1>

_via_ `man signal`

* `SIGHUP` - Hangup detected on controlling terminal or death of controlling process
* `SIGQUIT` - Quit from keyboard (via `^\`)
* `SIGTERM` - Termination signal
* `SIGINT` - Interrupt program

Should not be caught:
* `SIGKILL` - kill program (the familiar `kill -9`)
* `SIGSTOP` - stop process

But there are many common signals that a service should expect and handled. A few typical ones are `SIGHUP`, `SIGQUIT`, and `SIGTERM`.

There's also `SIGKILL` (i.e. the familiar `kill -9`) and `SIGSTOP`, although the standard is that they can't be caught, blocked, or ignored.

```sh
$ python mandrill/mayhem_13.py
```

```sh
$ pkill -TERM -f "python mandrill/mayhem_13.py"
```

<pre><code>
19:08:25,553 INFO: Pulled Message(inst_name='cattle-npww')
19:08:25,554 INFO: Extended deadline 3s Message(inst_name='cattle-npww')
19:08:25,655 INFO: Pulled Message(inst_name='cattle-rm7n')
19:08:25,655 INFO: Extended deadline 3s Message(inst_name='cattle-rm7n')
19:08:25,790 INFO: Saved Message(inst_name='cattle-rm7n') into database
19:08:25,831 INFO: Saved Message(inst_name='cattle-npww') into database
<span class="asyncio-hl">[1]    78851 terminated  python mandrill/mayhem_13.py</span>
</code></pre>

We see that we don't reach the `finally` clause where we log that we're cleaning up and close the loop.

<h1><span style="font-size:50%;font-style:italic;color:gray;">Graceful Shutdown:</span><br/>Gotta catch 'em all</h1>

In [None]:
if __name__ == '__main__':
    queue = asyncio.Queue()
    publisher_coro = handle_exception(publish(queue))
    consumer_coro = handle_exception(consume(queue))

    loop = asyncio.get_event_loop()  # <-- could happen here or earlier
    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except Exception:
        logging.error('Caught exception')    # <-- could happen here 
    except KeyboardInterrupt:
        logging.info('Process interrupted')  # <-- could happen here 
    finally:
        logging.info('Cleaning up')          # <-- could happen here 
        loop.stop()                          # <-- could happen here 

It should also be pointed out that – even if we were to only ever expect a `KeyboardInterrupt` / `SIGINT` signal – it could happen outside the catching of the exception, potentially causing the service to end up in an incomplete or otherwise unknown state:

<h1><span style="font-size:50%;font-style:italic;color:gray;">Graceful Shutdown:</span><br/>Using a signal handler</h1>

So, instead of catching `KeyboardInterrupt`, let's attach a signal handler to the loop.

#### Defining shutdown behavior

```py3
async def shutdown(signal, loop):
    logging.info(f'Received exit signal {signal.name}...')
    logging.info('Closing database connections')
    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]
    
    [task.cancel() for task in tasks]

    logging.info(f'Cancelling {len(tasks)} outstanding tasks')
    await asyncio.gather(*tasks)
    loop.stop()
    logging.info('Shutdown complete.')
```

Here I'm just closing that simulated database connections, returning messages to pub/sub as not acknowledged (so they can be redelivered and not dropped), and finally cancelling the tasks. We don't necessarily need to cancel pending tasks; we could just collect and allow them to finish. We may also want to take this opportunity to flush any collected metrics so they're not lost.

In [None]:
if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    queue = asyncio.Queue()
    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

Let's hook this up to the main event loop now.

I also removed the `KeyboardInterrupt` catch since that's now taken care of within the signal handling.

```sh
$ python mandrill/mayhem_14.py
```

```sh
# or -HUP or -INT
$ pkill -TERM -f "python mandrill/mayhem_14.py"
```

<pre><code>
19:11:25,321 INFO: Pulled Message(inst_name='cattle-lrnm')
19:11:25,321 INFO: Extended deadline 3s Message(inst_name='cattle-lrnm')
19:11:25,700 INFO: Pulled Message(inst_name='cattle-m0f6')
19:11:25,700 INFO: Extended deadline 3s Message(inst_name='cattle-m0f6')
19:11:25,740 INFO: Saved Message(inst_name='cattle-m0f6') into database
19:11:25,840 INFO: Saved Message(inst_name='cattle-lrnm') into database
19:11:26,143 INFO: Received exit signal SIGTERM...
19:11:26,143 INFO: Closing database connections
19:11:26,144 INFO: Cancelling outstanding tasks
<span class="asyncio-hl">19:11:26,144 ERROR: Caught exception</span>
<span class="asyncio-hl">19:11:26,144 ERROR: Caught exception</span>
19:11:26,144 INFO: Cleaning up
</code></pre>

Let's run this again.

It looks like we hit `"Caught exception"` twice. This is because awaiting on cancelled tasks will raise `asyncio.CancelledError`, which is expected. Let's add that to `handle_exception` as well:

In [None]:
async def handle_exception(fn, loop):
    try:
        await fn()
    except asyncio.CancelledError:
        logging.info('Coroutine cancelled')
    except Exception :
        logging.error('Caught exception')
    finally:
        loop.stop()

```sh
$ python mandrill/mayhem_14.py
```

```sh
$ pkill -INT -f "python mandrill/mayhem_14.py"
```

<pre><code>
19:22:10,47 INFO: Pulled Message(inst_name='cattle-1zsx')
19:22:10,47 INFO: Extended deadline 3s Message(inst_name='cattle-1zsx')
^C19:22:10,541 INFO: Received exit signal SIGINT...
19:22:10,541 INFO: Closing database connections
19:22:10,541 INFO: Cancelling outstanding tasks
<span class="asyncio-hl">19:22:10,541 INFO: Coroutine cancelled</span>
<span class="asyncio-hl">19:22:10,541 INFO: Coroutine cancelled</span>
19:22:10,541 INFO: Cleaning up
</code></pre>

So now we see our coroutines are cancelled and not some random exception.

<h1><span style="font-size:50%;font-style:italic;color:gray;">Graceful Shutdown:</span><br/>Which signals to care about</h1>

<table class='table table-striped'> <thead> <tr> <th></th> <th>Hard Exit</th> <th>Graceful</th> <th>Reload/Restart</th> </tr> </thead> <tbody> <tr> <th scope='row'>nginx</th> <td><code>TERM</code>, <code>INT</code></td> <td><code>QUIT</code></td> <td><code>HUP</code></td> </tr> <tr> <th scope='row'>Apache</th> <td><code>TERM</code></td> <td><code>WINCH</code></td> <td><code>HUP</code></td> </tr> <tr> <th scope='row'>uWSGI</th> <td><code>INT</code>, <code>QUIT</code></td> <td></td> <td><code>HUP</code>, <code>TERM</code></td> </tr> <tr><th scope='row'>Gunicorn</th> <td><code>INT</code>, <code>QUIT</code></td> <td><code>TERM</code></td> <td><code>HUP</code></td> </tr> <tr><th scope='row'>Docker</th> <td><code>KILL</code></td> <td><code>TERM</code></td> <td></td> </tr> </tbody> </table>

Apparently there's no standard. Basically, you should be aware of how you're running your service, and handle accordingly. 

If you run your services within a Docker container, then you might want to consider including the handling of `SIGTERM` since that is what's sent during a `docker stop`. `docker kill` sends – you guessed it – a `SIGKILL` to the main process inside the container.

If the service is sitting behind nginx, then consider handling [`SIGQUIT` to piggy-back](http://nginx.org/en/docs/control.html) for a graceful shutdown, and `SIGTERM` and `SIGINT` for an immediate exit. `SIGHUP` will actually reload configuration.

For Apache, you'd use [`SIGWINCH`](http://httpd.apache.org/docs/2.2/stopping.html#gracefulstop) for a graceful shutdown, `SIGTERM` for an immediate exit, and `SIGHUP` for restarting. 

When using [`uWSGI`](https://uwsgi-docs.readthedocs.io/en/latest/Management.html#signals-for-controlling-uwsgi), know that `SIGHUP` and `SIGTERM` actually reloads the workers and master process (graceful and immediate, respectively). To actually stop the uWSGI stack, you'll need to use `SIGINT` or `SIGQUIT`, although I don't see a way to do it gracefully.

Unlike `uWSGI`, Gunicorn allows for a graceful shutdown with `SIGTERM`. Use `SIGQUIT` and `SIGINT` for an immediate exit, and `SIGHUP` for reloading.

```sh
$ docker kill --signal SIGTERM <container>
```

Now if you're running these within a Docker container alongside your Python app, you'll have to [provide the desired signal with the `kill` command](https://docs.docker.com/engine/reference/commandline/kill/), e.g. `docker kill --signal SIGTERM <container>`. 

<h1><span style="font-size:50%;font-style:italic;color:gray;">Graceful Shutdown:</span><br/>Heads up: <code>asyncio.shield</code> isn't graceful</h1>

Another misleading API in `asyncio` is [`asyncio.shield`](https://docs.python.org/3/library/asyncio-task.html#asyncio.shield). The docs say it's a means to shield a future from cancellation. But if you have a coroutine that must not be cancelled during shutdown, `asyncio.shield` will not help you.

This is because the task that `asyncio.shield` creates gets included in `asyncio.all_tasks`, and therefore receives the cancellation signal like the rest of the tasks.

In [None]:
async def cant_stop_me():
    logging.info('Hold on...')
    await asyncio.sleep(60)
    logging.info('Done!')

To help illustrate, let's have a simple async function with a long sleep that we want to shield

In [None]:
if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    shielded_coro = asyncio.shield(cant_stop_me())

    try:
        loop.run_until_complete(shielded_coro)
    finally:
        logging.info('Cleaning up')
        loop.stop()

<pre><code>
13:24:20,105 INFO: Hold on...
^C13:24:21,156 INFO: Received exit signal SIGINT...
13:24:21,156 INFO: Cancelling 2 outstanding tasks
<span class="asyncio-hl">13:24:21,156 INFO: Coroutine cancelled</span>
13:24:21,157 INFO: Cleaning up
Traceback (most recent call last):
  File "examples/shield_test.py", line 62, in <module>
    loop.run_until_complete(shielded_coro)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError
</code></pre>

We see that we don't get to the "Done" log line, that it's immediately cancelled.

<h1><span style="font-size:50%;font-style:italic;color:gray;">TL;DR: Graceful Shutdown:</span></h1>

* `try`/`except`/`finally` isn't enough

* Use signal handlers

* Listen for the appropriate signals

We don't have any nursuries in `asyncio` core to clean ourselves up; it's up to us to be responsible and close up the connections and files we opened, respond to outstanding requests, basically leave things how we found them. 

Doing our cleanup in a `finally` clause isn't enough, though, since a signal could be sent outside of the try/except clause.

So as we construct the loop, we should tell how it should be deconstructed as soon as possible in the program. This ensures that "all our bases are covered", that we're not leaving artifacts anywhere.

And finally, we also need to be aware of when our program should shutdown, which is closely tied to how we run our program. If it's a manually ran script, then `SIGINT` is fine. But if it's within a daemonized Docker container, then `SIGTERM` is more appropriate.

<h1>Exception Handling</h1>
<img src="images/surprised_mandrill.jpg" style="height:400px">

You may have noticed that, while we're catching exceptions on the top level, we're not paying any mind to exceptions that could be raised from within coroutines like `restart_host`, `save`, etc. 

In [None]:
async def restart_host(msg):
    # faked error
    rand_int = random.randrange(1, 3)
    if rand_int == 3:
        raise Exception('Could not restart host')
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.randrange(1,3))
    logging.info(f'Restarted {msg.hostname}')

To show you what I mean, let's fake an error where we can't restart a host.

Running it, we see:

```sh
$ python mandrill/mayhem_15.py
```
<pre><code>
08:55:58,122 INFO: Pulled Message(inst_name='cattle-tx09')
08:55:58,122 INFO: Extended deadline 3s Message(inst_name='cattle-tx09')
08:55:58,123 ERROR: Could not restart cattle-tx09.example.net
<span class="asyncio-hl">08:55:58,123 ERROR: Task exception was never retrieved</span>
future: <Task finished coro=<handle_message() done, defined at mandrill/mayhem_15.py:72> exception=Exception('Could not restart cattle-tx09.example.net')>
Traceback (most recent call last):
  File "mandrill/mayhem_15.py", line 82, in handle_message
    await asyncio.gather(save_coro, restart_coro)
  File "mandrill/mayhem_15.py", line 49, in restart_host
    raise Exception(f'Could not restart {msg.hostname}')
Exception: Could not restart cattle-tx09.example.net
08:55:58,904 INFO: Saved Message(inst_name='cattle-tx09') into database
08:56:00,127 INFO: Extended deadline 3s Message(inst_name='cattle-tx09')
</code></pre>

We see that `cattle-tx09.example.net` could not be restarted. While the service doesn't crash and it did save the message in the database, it did not clean up and `ack` the message. The `extend` on the message deadline will also keep spinning, so we've essentially deadlocked on the message.

In [None]:
async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True)
    event.set()

The simple thing to do is add `return_exceptions=True` to `asyncio.gather`, so rather than completely dropping an exception, it's returned along with the successful results:

```sh
$ python mandrill/mayhem_15.py
```
```
09:08:50,658 INFO: Pulled Message(inst_name='cattle-4f52')
09:08:50,659 INFO: Extended deadline 3s Message(inst_name='cattle-4f52')
09:08:51,025 INFO: Pulled Message(inst_name='cattle-orj0')
09:08:51,025 INFO: Extended deadline 3s Message(inst_name='cattle-orj0')
09:08:51,497 INFO: Pulled Message(inst_name='cattle-f4nw')
09:08:51,497 INFO: Extended deadline 3s Message(inst_name='cattle-f4nw')
09:08:51,626 INFO: Saved Message(inst_name='cattle-4f52') into database
09:08:51,706 INFO: Saved Message(inst_name='cattle-orj0') into database
09:08:51,723 INFO: Done. Acked Message(inst_name='cattle-4f52')
09:08:52,009 INFO: Saved Message(inst_name='cattle-f4nw') into database
09:08:52,409 INFO: Pulled Message(inst_name='cattle-dft2')
09:08:52,410 INFO: Extended deadline 3s Message(inst_name='cattle-dft2')
09:08:52,444 INFO: Saved Message(inst_name='cattle-dft2') into database
09:08:52,929 INFO: Done. Acked Message(inst_name='cattle-dft2')
09:08:52,930 INFO: Pulled Message(inst_name='cattle-ft4h')
09:08:52,930 INFO: Extended deadline 3s Message(inst_name='cattle-ft4h')
09:08:53,029 INFO: Extended deadline 3s Message(inst_name='cattle-orj0')
09:08:53,30 INFO: Restarted cattle-orj0.example.net
```

In [None]:
def handle_results(results):
    for result in results:
        if isinstance(result, Exception):
            logging.error(f'Caught exception: {result}')

In [None]:
async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    results = await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True
    )
    handle_results(results)
    event.set()

We _could_ add a callback via `add_done_callback` to the `asyncio.gather` future, but as I said, I'm allergic. We can just process the results afterwards:

```sh
$ python mandrill/mayhem_15.py
```
<pre><code>
09:27:48,143 INFO: Pulled Message(inst_name='cattle-gas8')
09:27:48,144 INFO: Extended deadline 3s Message(inst_name='cattle-gas8')
09:27:48,644 INFO: Pulled Message(inst_name='cattle-arpg')
09:27:48,645 INFO: Extended deadline 3s Message(inst_name='cattle-arpg')
09:27:48,880 INFO: Saved Message(inst_name='cattle-gas8') into database
<span class="asyncio-hl">09:27:48,880 ERROR: Caught exception: Could not restart cattle-gas8.example.net</span>
09:27:49,385 INFO: Pulled Message(inst_name='cattle-4nl3')
09:27:49,385 INFO: Extended deadline 3s Message(inst_name='cattle-4nl3')
09:27:49,503 INFO: Saved Message(inst_name='cattle-arpg') into database
<span class="asyncio-hl">09:27:49,504 ERROR: Caught exception: Could not restart cattle-arpg.example.net</span>
09:27:49,656 INFO: Pulled Message(inst_name='cattle-4713')
09:27:49,656 INFO: Extended deadline 3s Message(inst_name='cattle-4713')
09:27:49,734 INFO: Saved Message(inst_name='cattle-4nl3') into database
<span class="asyncio-hl">09:27:49,734 ERROR: Caught exception: Could not restart cattle-4nl3.example.net</span>
09:27:49,747 INFO: Done. Acked Message(inst_name='cattle-gas8')
</code></pre>

<h1><span style="font-size:50%;font-style:italic;color:gray;">TL;DR: Exception Handling</span></h1>

* Exceptions – handled or not – do not crash the program

* `asyncio.gather` will swallow exceptions by default

Exceptions will not crash the system - unlike non-asyncio programs. and they might go unnoticed. So we need to account for that.

I personally like using `asyncio.gather` because the order of the returned results are deterministic, but it's easy to get tripped up with it. By default, it will swallow exceptions but happily continue working on the other tasks that were given. If an exception is never returned, weird behavior can happen, like spinning around an event.

# Making synchronous code `asyncio`-friendly
<img src="images/mandrill_bored.jpg" style="height:400px">

I'm sure that as folks have started to use `asyncio`, they've realized that `async`/`await` starts infecting everything around the codebase; everything needs to be async. This isn't necessarily a bad thing; it just forces a shift in perspective.

In [None]:
def save_sync(msg):
    # unhelpful simulation of blocking i/o work
    time.sleep(random.random())
    logging.info(f'[blocking] Saved {msg} into database')

In [None]:
async def handle_message(msg, executor, loop):
    event = asyncio.Event()
    save_coro = loop.run_in_executor(executor, save_sync, msg)

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    results = await asyncio.gather(
        save_coro, restart_host(msg), return_exceptions=True
    )
    handle_results(results)
    event.set()

In [None]:
async def consume(queue):
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    loop = asyncio.get_running_loop()
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')
        asyncio.create_task(handle_message(msg, executor, loop))

But if you're very lucky, you'll need to use [third-party code](https://googlecloudplatform.github.io/google-cloud-python/latest/pubsub/subscriber/index.html) that blocks. To simulate this, I'm making use of the synchronous publish & consume code from earlier.

In [None]:
async def consume(executor, queue, loop):
    while True:
        msg = await loop.run_in_executor(executor, consume_sync, queue)
        if not msg:  # could be None
            continue
        asyncio.create_task(handle_message(msg))

In [None]:
if __name__ == '__main__':
    # <--snip-->
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    consumer_coro = consume(executor, queue, loop)
    # <--snip-->

So for our code to work with this, we need to rework our async consumer. Not much needed actually. Here I'm still making use of our own asynchronous `consume` coroutine to call the non-async consumer.

Aside: There's a handy little package called [asyncio-extras](https://asyncio-extras.readthedocs.io/en/latest/) which provides a decorator for synchronous functions/methods. You can avoid the boilerplate of setting up an executor and just `await` the decorated function.

## Making threaded code `asyncio`-<strike>friendly</strike> tolerable
<img src="images/surprised_mandrill_2.jpg" style="height:400px">

But sometimes, third-party code throws a wrench at you...

If you're lucky, you'll be faced with a third-party library that is multi-threaded _and_ blocking. For example, Google Pub/Sub's Python library makes use of gPRC under the hood via threading, but [is also blocks](https://googlecloudplatform.github.io/google-cloud-python/latest/pubsub/subscriber/index.html#subscription-callbacks) when we're opening up a subscription. The library also requires a non-async callback (:grimace:) for when a message is received. 

In [None]:
def callback(msg):
    msg.ack()
    data = json.loads(msg.data.decode('utf-8'))
    logging.info(f'Consumed {data["msg_id"]}')

def consume_sync():
    client = get_subscriber_client()  # helper func
    future = client.subscribe(SUBSCRIPTION, callback)

    try:
        future.result()  # blocking
    except Exception as e:
        logging.error(f'Caught exception: {e}')

In typical Google fashion, they'll stuff some uber-cool technology in a difficult to work-with library. This future that's returned, it will make use of gRPC for bidirectional communication and remove our need to periodically pull for messages as well as manage message deadlines.

To illustrate, here's how we can use `loop.run_in_executor` for this blocking code

In [None]:
async def run_pubsub():
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

    consume_coro = loop.run_in_executor(executor, consume_sync)

    asyncio.ensure_future(consume_coro)
    loop.create_task(publish(executor, loop))

I've made a helper coroutine function to setup an executor, use it to kick off the synchronous consumer, and pass it off to my async publisher to use for its non-async work.

In [None]:
async def run_something_else():
    while True:
        logging.info('Running something else')
        await asyncio.sleep(random.random())

async def run():
    coros = [run_pubsub(), run_something_else()]
    await asyncio.gather(*coros)

I'd like to also prove that this is now non-blocking, so let's add a dummy coroutine function to be ran alongside `run_pubsub`. We'll add two coroutine function, and update the `__main__` section to just run the `run` coroutine function.

```sh
$ python examples/mandrill/mayhem_19.py
```
```
17:24:09,613 INFO: Running something else
17:24:09,716 INFO: Consumed 6tal
17:24:09,716 INFO: Consumed k5yg
17:24:09,716 INFO: Consumed 0m4d
17:24:09,717 INFO: Running something else
17:24:09,820 INFO: Running something else
17:24:09,822 INFO: Consumed qiwg
17:24:09,822 INFO: Consumed pha7
17:24:09,822 INFO: Consumed ec9c
17:24:09,924 INFO: Running something else
17:24:09,929 INFO: Consumed 8mgt
17:24:09,929 INFO: Consumed x6u3
17:24:09,929 INFO: Consumed 1kue
17:24:09,929 INFO: Consumed a1og
17:24:10,26 INFO: Running something else
17:24:10,31 INFO: Consumed 204t
17:24:10,31 INFO: Consumed vmcg
17:24:10,31 INFO: Consumed f5jj
^C17:24:10,91 INFO: Received exit signal SIGINT...
17:24:10,91 INFO: Shutdown complete.
17:24:10,91 INFO: Cleaning up
```

Now running it will show...

As I forewarned: although it will handle the message leasing for us, there are threads going on in the background. But, it introduces at least 15 threads... 

In [None]:
async def watch_threads():
    while True:
        threads = threading.enumerate()
        logging.info(f'Current thread count: {len(threads)}')
        logging.info('Current threads:')
        for thread in threads:
            logging.info(f'-- {thread.name}')
        logging.info('Sleeping for 5 seconds...')
        await asyncio.sleep(5)

I'm going to reuse that little `run_something_else` coroutine to actually get some periodic stats on threads.

I'm also going to use a prefix so I can easily tell which threads I created versus others.

In [None]:
async def run_pubsub():
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=5, thread_name_prefix='Mandrill')
    # <--snip-->

<pre><code>
15:35:39,693 INFO: Current thread count: 2
15:35:39,693 INFO: Current threads:
15:35:39,693 INFO: -- MainThread
15:35:39,693 INFO: -- Mandrill_0
15:35:39,693 INFO: Sleeping for 5 seconds...
15:35:44,697 INFO: Current thread count: 22
15:35:44,698 INFO: Current threads:
15:35:44,698 INFO: -- MainThread
15:35:44,698 INFO: -- Mandrill_X  <span class="asyncio-hl"><-- x5</span>
15:35:44,698 INFO: -- Thread-CallbackRequestDispatcher
15:35:44,698 INFO: -- Thread-ConsumeBidirectionalStream
15:35:44,698 INFO: -- Thread-1
15:35:44,698 INFO: -- Thread-LeaseMaintainer
15:35:44,698 INFO: -- Thread-2
15:35:44,698 INFO: -- Thread-Heartbeater
15:35:44,698 INFO: -- ThreadPoolExecutor-ThreadScheduler_X  <span class="asyncio-hl"><-- x10</span>
15:35:44,699 INFO: Sleeping for 5 seconds...
15:35:49,703 INFO: Current thread count: 22
15:35:49,704 INFO: Current threads:
15:35:49,704 INFO: -- MainThread
15:35:49,704 INFO: -- Mandrill_X   <-- X5
15:35:49,704 INFO: -- Thread-CallbackRequestDispatcher
15:35:49,704 INFO: -- Thread-ConsumeBidirectionalStream
15:35:49,704 INFO: -- Thread-1
15:35:49,704 INFO: -- Thread-LeaseMaintainer
15:35:49,704 INFO: -- Thread-2
15:35:49,704 INFO: -- Thread-Heartbeater
15:35:49,704 INFO: -- ThreadPoolExecutor-ThreadScheduler_X  <span class="asyncio-hl"><-- x10</span>
15:35:49,705 INFO: Sleeping for 5 seconds...
15:35:54,707 INFO: Current thread count: 23
15:35:54,707 INFO: Current threads:
15:35:54,707 INFO: -- MainThread
15:35:54,707 INFO: -- Mandrill_X  <span class="asyncio-hl"><-- x5</span>
15:35:54,707 INFO: -- Thread-CallbackRequestDispatcher
15:35:54,707 INFO: -- Thread-ConsumeBidirectionalStream
15:35:54,707 INFO: -- Thread-1
15:35:54,707 INFO: -- Thread-LeaseMaintainer
15:35:54,707 INFO: -- Thread-2
15:35:54,708 INFO: -- Thread-Heartbeater
15:35:54,708 INFO: -- ThreadPoolExecutor-ThreadScheduler_X  <-- x10
15:35:54,708 INFO: -- Thread-MonitorBatchPublisher
</code></pre>

We see we have the `MainThread` which is the `asyncio` event loop. There's also five `Mandrill_`-prefixed threads that were created by our threadpool executor. There's five because we limited the number of workers when creating the executor. It looks as if the subscription client has its own threadpool executor named `ThreadPoolExecutor-ThreadScheduler`; `Thread-MonitorBatchPublisher` is from the publisher; and some gRPC/bidirectional streaming going on with the rest of the threads (heart beater, lease maintainer, etc). 

All in all, though, the approach to threaded code isn't any different than the non-async code. 

## Calling async code from threads
<img src="images/surprised_mandrill_4.jpg" style="height:400px"></img>

Until you release you need to call asynchronous code from a non-async function that's within a thread.

Obviously we can't just `ack` a message once we receive it. We need to restart the required host and save the message in our database. We have to somehow call asynchronous code from a non-async function, from a separate thread.

In [None]:
def callback(msg):
    data = json.loads(msg.data.decode('utf-8'))
    logging.info(f'Consumed {data["msg_id"]}')
    asyncio.create_task(handle_message(data))

This is pretty embarrassing, bare with me. Let's first attempt to use the `asyncio` API that we're familar with, and update the callback function with creating a task via `asyncio.create_task` from the `handle_message` coroutine we defined earlier:

<pre><code>
16:45:36,709 INFO: Running something else
16:45:36,833 INFO: Consumed es7s
<span class="asyncio-hl">16:45:36,833 ERROR: Top-level exception occurred in callback while processing a message</span>
Traceback (most recent call last):
  File "/Users/lynn/.pyenv/versions/ep18-37/lib/python3.7/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 63, in _wrap_callback_errors
    callback(message)
  File "examples/mandrill/mayhem_21.py", line 115, in callback
    asyncio.create_task(handle_message(data))
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/tasks.py", line 320, in create_task
    loop = events.get_running_loop()
</code></pre>

Ugh; sure, yes this of course makes sense. At this point, we're in another thread and there is no loop running for that thread.

In [None]:
def callback(msg):
    data = json.loads(msg.data.decode('utf-8'))
    logging.info(f'Consumed {data["msg_id"]}')
    current_thread = threading.current_thread()
    logging.info(f'Current thread: {current_thread.name}')
    try:
        loop = asyncio.get_running_loop()
        logging.info(f'Found loop: {loop}')
    except RuntimeError:
        logging.error('No event loop in thread')
    asyncio.create_task(handle_message(data))

To help show what thread we're in, let's add some log lines.

<pre><code>
17:35:11,785 INFO: Running something else
17:35:11,855 INFO: Running something else
17:35:11,856 INFO: Consumed xw6r
17:35:11,856 INFO: Current thread: ThreadPoolExecutor-ThreadScheduler_0
<span class="asyncio-hl">17:35:11,856 ERROR: No event loop in thread</span>
17:35:11,856 ERROR: Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
# snip
</code></pre>

Indeed, the current thread for the callback does not have a running event loop (I hear some eye rolls), as described in [the docs](https://docs.python.org/3/library/asyncio-eventloops.html#asyncio.get_running_loop). D'uh Lynn, read the freaking manual.

In [None]:
def consume_sync(loop):
    client = get_subscriber()
    def callback(pubsub_msg):
        logging.info(f'Consumed {pubsub_msg.message_id}')
        loop.create_task(handle_message(pubsub_msg))
    client.subscribe(SUBSCRIPTION, callback)

What if we _gave_ it the loop we're using?

```
18:08:09,761 INFO: Running something else
18:08:09,826 INFO: Consumed 5236
18:08:09,826 INFO: Consumed 5237
18:08:09,827 INFO: Consumed 5238
18:08:09,827 INFO: Consumed 5239
18:08:09,828 INFO: Consumed 5240
18:08:10,543 INFO: Handling Message(inst_name='xbci')
18:08:10,543 INFO: Handling Message(inst_name='e8x5')
18:08:10,544 INFO: Handling Message(inst_name='shti')
18:08:10,544 INFO: Handling Message(inst_name='9yne')
18:08:10,544 INFO: Handling Message(inst_name='qgor')
18:08:10,544 INFO: Running something else
18:08:10,601 INFO: Saved Message(inst_name='shti') into database
18:08:10,721 INFO: Saved Message(inst_name='e8x5') into database
18:08:10,828 INFO: Saved Message(inst_name='xbci') into database
18:08:10,828 WARNING: Caught exception: Could not restart xbci.example.net
18:08:11,162 INFO: Saved Message(inst_name='9yne') into database
18:08:11,167 INFO: Running something else
18:08:11,481 INFO: Saved Message(inst_name='qgor') into database
18:08:11,549 INFO: Restarted e8x5.example.net
18:08:11,550 INFO: Restarted 9yne.example.net
18:08:11,550 INFO: Restarted qgor.example.net
18:08:11,674 INFO: Done. Acked 5240
18:08:11,821 INFO: Done. Acked 5236
18:08:12,108 INFO: Running something else
18:08:12,276 INFO: Done. Acked 5237
18:08:12,322 INFO: Running something else
18:08:12,510 INFO: Done. Acked 5239
18:08:12,549 INFO: Restarted shti.example.net
18:08:12,839 INFO: Running something else
18:08:12,841 INFO: Consumed 5241
18:08:12,842 INFO: Consumed 5242
18:08:12,842 INFO: Consumed 5243
18:08:12,843 INFO: Consumed 5244
18:08:12,843 INFO: Consumed 5245
18:08:13,153 INFO: Handling Message(inst_name='udtv')
18:08:13,154 INFO: Handling Message(inst_name='a75e')
18:08:13,154 INFO: Handling Message(inst_name='rvxb')
18:08:13,154 INFO: Handling Message(inst_name='ka9a')
18:08:13,154 INFO: Handling Message(inst_name='o7f2')
18:08:13,155 INFO: Done. Acked 5238
18:08:13,322 INFO: Saved Message(inst_name='rvxb') into database
18:08:13,477 INFO: Saved Message(inst_name='ka9a') into database
18:08:13,478 WARNING: Caught exception: Could not restart ka9a.example.net
^C18:08:13,506 INFO: Received exit signal SIGINT...
18:08:13,506 INFO: Shutdown complete.
18:08:13,506 INFO: Cleaning up
```

This is deceptive. We're lucky it works. Once we share some object between the threaded code in the callback, and the asynchronous code when handling the message, we see that we've shot ourselves in the foot. 

In [None]:
GLOBAL_QUEUE = asyncio.Queue()

# add to main loop to run
async def get_from_queue():
    while True:
        pubsub_msg = await GLOBAL_QUEUE.get()
        logging.info(f'Got {pubsub_msg.message_id} from queue')
        asyncio.create_task(handle_message(pubsub_msg))

# callback for consumer
async def add_to_queue(msg):
    logging.info(f'Adding {msg.message_id} to queue')
    await GLOBAL_QUEUE.put(msg)
    
def consume_sync(loop):
    client = get_subscriber()
    def callback(pubsub_msg):
        logging.info(f'Consumed {pubsub_msg.message_id}')
        # returns an asyncio.Task obj
        task = loop.create_task(handle_message(pubsub_msg))
        task.cancel()  # to prove this isn't thread safe

    client.subscribe(SUBSCRIPTION, callback)

Let's create a shared queue that the consumer will add to, and then we read off of it to give to `handle_message`.

```
18:12:08,359 INFO: Consumed 5241
18:12:08,359 INFO: Consumed 5243
18:12:08,359 INFO: Consumed 5244
18:12:08,360 INFO: Consumed 5245
18:12:08,360 INFO: Consumed 5242
18:12:08,414 INFO: Consumed 5246
18:12:08,415 INFO: Consumed 5247
18:12:08,415 INFO: Consumed 5248
18:12:08,415 INFO: Consumed 5249
18:12:08,416 INFO: Consumed 5250
18:12:08,821 INFO: Adding 5241 to queue
18:12:08,821 INFO: Adding 5243 to queue
18:12:08,822 INFO: Adding 5244 to queue
18:12:08,822 INFO: Adding 5245 to queue
18:12:08,822 INFO: Adding 5242 to queue
18:12:08,822 INFO: Adding 5246 to queue
18:12:08,822 INFO: Adding 5247 to queue
18:12:08,822 INFO: Adding 5248 to queue
18:12:08,822 INFO: Adding 5249 to queue
18:12:08,822 INFO: Adding 5250 to queue
18:12:13,403 INFO: Consumed 5251
18:12:13,404 INFO: Consumed 5252
18:12:13,404 INFO: Consumed 5253
18:12:13,404 INFO: Consumed 5254
18:12:13,404 INFO: Consumed 5255
18:12:13,875 INFO: Adding 5251 to queue
18:12:13,876 INFO: Adding 5252 to queue
18:12:13,876 INFO: Adding 5253 to queue
18:12:13,876 INFO: Adding 5254 to queue
18:12:13,876 INFO: Adding 5255 to queue
^C18:12:14,896 INFO: Received exit signal SIGINT...
18:12:14,896 INFO: Shutdown complete.
18:12:14,896 INFO: Cleaning up
```

Running it, we see something funky. The log line `logging.info(f'Got {pubsub_msg.message_id} from queue')` never shows; it doesn't ever look like we consume from our global queue.

In [None]:
async def add_to_queue(msg):
    logging.info(f'Adding {msg.message_id} to queue')
    await GLOBAL_QUEUE.put(msg)
    logging.info(f'Current queue size: {GLOBAL_QUEUE.qsize()}')

If we add a line in our `add_to_queue` coroutine to see the queue size:

```
18:17:09,537 INFO: Adding 5271 to queue
18:17:09,537 INFO: Current queue size: 1
18:17:09,537 INFO: Adding 5272 to queue
18:17:09,537 INFO: Current queue size: 2
18:17:09,537 INFO: Adding 5273 to queue
18:17:09,537 INFO: Current queue size: 3
18:17:09,537 INFO: Adding 5274 to queue
18:17:09,537 INFO: Current queue size: 4
18:17:09,537 INFO: Adding 5275 to queue
18:17:09,537 INFO: Current queue size: 5
18:17:14,572 INFO: Adding 5276 to queue
18:17:14,572 INFO: Current queue size: 6
18:17:14,572 INFO: Adding 5277 to queue
18:17:14,572 INFO: Current queue size: 7
18:17:14,572 INFO: Adding 5278 to queue
18:17:14,572 INFO: Current queue size: 8
18:17:14,572 INFO: Adding 5279 to queue
18:17:14,572 INFO: Current queue size: 9
18:17:14,572 INFO: Adding 5280 to queue
18:17:14,572 INFO: Current queue size: 10
^C18:17:16,899 INFO: Received exit signal SIGINT...
18:17:16,899 INFO: Shutdown complete.
18:17:16,899 INFO: Cleaning up
```

We can see that the queue is ever-growing, and in fact we're not reading from it.

I'm sure a lot of you see what's going on here. We're not thread safe. /facepalm/

In [None]:
def callback(loop, pubsub_msg):
    logging.info(f'Consumed {pubsub_msg.message_id}')
    coro = add_to_queue(pubsub_msg)
    # returns a concurrent.futures.Future obj
    fut = asyncio.run_coroutine_threadsafe(coro, loop)
    fut.cancel()

Let's make use of `asyncio.run_coroutine_threadsafe` to see what happens

```
20:46:59,144 INFO: Running something else
20:46:59,209 INFO: Consumed 6806
20:46:59,210 INFO: Consumed 6835
20:46:59,210 INFO: Adding 6806 to queue
20:46:59,210 INFO: Current queue size: 1
20:46:59,210 INFO: Adding 6835 to queue
20:46:59,210 INFO: Current queue size: 2
20:46:59,211 INFO: Got 6806 from queue
20:46:59,211 INFO: Got 6835 from queue
20:46:59,211 INFO: Consumed 6834
20:46:59,211 INFO: Handling Message(inst_name='mbab')
20:46:59,212 INFO: Consumed 6823
20:46:59,212 INFO: Handling Message(inst_name='tekn')
20:46:59,212 INFO: Consumed 6822
20:46:59,212 INFO: Adding 6834 to queue
20:46:59,213 INFO: Consumed 6825
20:46:59,213 INFO: Current queue size: 1
20:46:59,213 INFO: Consumed 6828
20:46:59,214 INFO: Adding 6823 to queue
20:46:59,214 INFO: Consumed 6829
20:46:59,214 INFO: Current queue size: 2
20:46:59,214 INFO: Consumed 6826
20:46:59,215 INFO: Got 6834 from queue
20:46:59,215 INFO: Got 6823 from queue
20:46:59,215 INFO: Adding 6822 to queue
20:46:59,215 INFO: Current queue size: 1
20:46:59,215 INFO: Handling Message(inst_name='prgs')
20:46:59,216 INFO: Handling Message(inst_name='ifoc')
20:46:59,216 INFO: Adding 6825 to queue
20:46:59,216 INFO: Current queue size: 2
20:46:59,216 INFO: Consumed 6832
20:46:59,216 INFO: Adding 6828 to queue
20:46:59,216 INFO: Consumed 6833
```

<h1><span style="font-size:50%;font-style:italic;color:gray;">TL;DR: Making synchronous code <code>asyncio</code>-friendly</span></h1>

* Simple to get around synchronous with `ThreadPoolExecutor`

* Threads & `asyncio` - use `_threadsafe` API

In my opinion, it isn't difficult to work with synchronous code with `asyncio`.

However, it _is_ difficult to work with threads, particularly with asyncio. If you must, use the `_threadsafe` APIs that `asyncio` gives you.

<img src="images/mandrill_backpack.jpg"/>

But if you can just hide away somewhere where they can't get you.

# Thank you!
<h3>rogue.ly/aio</h3>
<h3> Lynn Root <span style="color:rgb(25,230,140)">|</span> Spotify <span style="color:rgb(25,230,140)">|</span> @roguelynn</h3>

So in essence, this talk is something I would have liked a year ago; so I'm speaking to past Lynn here. But I'm hoping there are others that benefit from a use case that's not a web crawler. 

Thanks!