## Coroutines for IO-bound tasks

In this notebook, we'll weave together our new (Tweet Parser)[https://github.com/tw-ddis/tweet_parser] and some python asyncio magic.

Let's set up the environment and demonstrate a motivating example.

In [1]:
from IPython.display import HTML
HTML('<iframe width="560" height="315" src="https://www.youtube.com/embed/dD9NgzLhbBM" frameborder="0" allowfullscreen></iframe>')

In [3]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

import itertools as it
from functools import partial

import seaborn as sns
import pandas as pd
import requests
from tweet_parser.tweet import Tweet

import sec # you will not have this python file; I use it to keep `secrets` like passwords hidden

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


We can define a few constants here that will be used throughout our example.

In [5]:
username = "agonzales@twitter.com"
AUTH = requests.auth.HTTPBasicAuth(username, sec.GNIP_API_PW)
GNIP_BASE_URL = "https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?"

This function is a little helper for programatically generating valid queries for terms with the Gnip api. 



In [6]:
def gen_query_url(url, terms, max_results=100):
    if isinstance(terms, str):
        terms = terms.split()
    return ''.join([url,
                    "query=",
                    "%20".join(terms),
                    "&maxResults={}".format(max_results)])

Lets say you want to get a collection of tweets matching some criteria - this is an extremely common task. The process might look something like this:

In [7]:
query = gen_query_url(GNIP_BASE_URL, ["just", "bought", "a", "house"])
print(query)

https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?query=just%20bought%20a%20house&maxResults=100


In [8]:
import requests

def sync_tweets(query):
    return requests.get(url=query, auth=AUTH).json()['results']

In [9]:
%%time
tweets = [Tweet(i) for i in sync_tweets(query)]

CPU times: user 43.9 ms, sys: 8.19 ms, total: 52.1 ms
Wall time: 805 ms


In [10]:
print(tweets[0].text)

Jesus, my best friend in London just bought a house with her boyfriend why am I feeling so oldddðŸ˜­


Easy peasy. What if you have a bunch of queries to match (this is a bit contrived, but serves a purpose). You might define all your queries as such and run a for loop to query all of them.

In [11]:
formed_query = partial(gen_query_url, url=GNIP_BASE_URL, max_results=100)
queries = [formed_query(terms=[i]) for i in ["eclipse", "nuclear", "korea", "cats", "ai", "memes", "googlebro"]]
queries

['https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?query=eclipse&maxResults=100',
 'https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?query=nuclear&maxResults=100',
 'https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?query=korea&maxResults=100',
 'https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?query=cats&maxResults=100',
 'https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?query=ai&maxResults=100',
 'https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?query=memes&maxResults=100',
 'https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?query=googlebro&maxResults=100']

In [12]:
%%time
tweets = [Tweet(i) for i in it.chain.from_iterable([sync_tweets(query) for query in queries])]

CPU times: user 335 ms, sys: 28.1 ms, total: 363 ms
Wall time: 4.38 s


Works great, but notice that there seems to be linear scaling for the time it takes to run this. Given that this is a trivial amount of _computation_ and a task that is almost entirely taken up by system calls / IO, it's a perfect opportunity to add parallism to the mix and speed it up.

IO-bound parallism is commonly handled with a technique called asyncronous programming, in which the semantics _coroutine_, _event loop_, _user-level thread_, _task_, _future_, etc. are introduced. 

In modern python (>3.5), the language has builtins for using coroutines, exposed via the `asyncio` module and the keywords `async` and `await`. Several libraries have been introduced that make use of coroutines internally, such as `aiohttp`, which is mostly a coroutine verison of `requests`.


Let's look at what the basic coroutine version of our above simple example would look like in aiohttp:

In [13]:
import asyncio
import aiohttp
import async_timeout

    
async def fetch_tweets_coroutine(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url, auth=aiohttp.BasicAuth(AUTH.username, AUTH.password)) as response:
            return await response.json()


In [14]:
%%time
loop = asyncio.get_event_loop()
tweets = [Tweet(i) for i in loop.run_until_complete(fetch_tweets_coroutine(query))['results']]

CPU times: user 40.3 ms, sys: 4.15 ms, total: 44.5 ms
Wall time: 672 ms


In [19]:
print(tweets[0].user_id, tweets[0].text)

2701385078 Jesus, my best friend in London just bought a house with her boyfriend why am I feeling so oldddðŸ˜­


It's a lot more code that our simple requests example and doesn't work any more quickly, though this is expected since the time is really response time to and from Gnip. 

Let's try again with our longer set of queries, redefining the methods to handle this more naturally.

In [20]:
async def fetch_tweets_fancy(session, url):
    async with session.get(url, auth=aiohttp.BasicAuth(AUTH.username, AUTH.password)) as response:
        # print("collecting query: {}".format(url))
        _json = await response.json()
        return [Tweet(t) for t in _json["results"]]
    
    
async def collect_queries(queries):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for query in queries:
            task = asyncio.ensure_future(fetch_tweets_fancy(session, query))
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
        return responses

In [21]:
formed_query = partial(gen_query_url, url=GNIP_BASE_URL, max_results=100)
queries = [formed_query(terms=[i]) for i in ["eclipse", "nuclear", "korea", "cats", "ai", "memes"]]

In [22]:
%%time
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(collect_queries(queries))
res = list(it.chain.from_iterable(loop.run_until_complete(future)))

CPU times: user 135 ms, sys: 12.3 ms, total: 147 ms
Wall time: 758 ms


In [24]:
print(res[0].text)
print(len(res))

RT @surly_viking: BAHAHAHAHA!!! Mom Asks To â€˜Rescheduleâ€™ Eclipse Because Itâ€™s On A School Day, Internet Canâ€™t Stop Laughing https://t.co/Cqâ€¦
600


So, what the hell is a coroutine and why does it work the way that it does?


First, we have to talk about the differences between threads, coroutines, parallelism, and concurrency.
For a short intro, see (this SO thread)[https://stackoverflow.com/questions/1934715/difference-between-a-coroutine-and-a-thread]

- Concurrency - separation of tasks for seamless execution (IDE, operating systems, complex ops)

- parallelism - execution of multiple tasks simultaneously to increase speed

- thread - OS level scheduling and concurrency. blocking, context switching, deadlocks, lock contention, kernel, premption

- ULT / Coroutine - non-blocking, program-level, event-based, "juggling"


(discussion)


further reading:

- (coroutine in node.js and python)[http://sahandsaba.com/understanding-asyncio-node-js-python-3-4.html]

- (combinatorics and coroutines)[http://sahandsaba.com/combinatorial-generation-using-coroutines-in-python.html]

- (Long example of python internals / building coroutines)[http://www.aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html]

- (making 1 million requests with aiohttp)[https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html]

- (coroutine patterns)[https://medium.com/python-pandemonium/asyncio-coroutine-patterns-beyond-await-a6121486656f]





