In [1]:
import asyncio         # for handling asynchronous tasks
import aiohttp         # for making asynchronous http requests (requires asyncio)
import requests        # for making synchronous requests
import scraper         # from homework 1
import csv             # for csv parsing
import time            # for timing our implementations

from concurrent.futures import ProcessPoolExecutor # for parallelizing work

## Pre-requisites

This tutorial requires Python3, since `async` and `await` aren't present in Python2.

To run Jupyter with Python3, enter into your terminal:
```
> pip3 install jupyter
> jupyter kernelspec install
```

To run locally, you'll also need a modified `scraper.py` from Homework 1 (some of the Yelp HTML has changed) and `restaurants.csv`, a list of Yelp business urls.

## Generating the data

If you'd like, you can generate `restaurants.csv` yourself; just change `config_filepath` to point to your Yelp API keys:

In [2]:
# change to the filepath containing your API keys
config_filepath = "yelp_api.json"

# feel free to change to your liking:
output_filepath = "restaurants.csv"
restaurant_queries = ["San Francisco"]
restaurants = []

# make the calls to Yelp API
yelp_client = scraper.authenticate(config_filepath)
for query in restaurant_queries:
    restaurants += scraper.all_restaurants(yelp_client, query)
    
print(type(restaurants[0]))
print(len(restaurants))

<class 'yelp.obj.business.Business'>
1000


Run the cell below to write the data to a file, so you don't have to regenerate every time.

In [3]:
# extract urls from each business object
urls = [r.url for r in restaurants]

# write the results to a csv file
with open(output_filepath, "w") as f:
    w = csv.writer(f, lineterminator='\n')
    for url in urls:
        w.writerow([url])

## Loading the data from file

If you'd prefer not to generate the data, run the cell below to read from `restaurants.csv`.

In [4]:
with open("restaurants.csv", "r") as f:
    r = csv.reader(f)
    urls = [row[0] for row in r]
    
print(urls[0])
print(len(urls))

https://www.yelp.com/biz/dakshin-san-francisco-6?adjust_creative=xkxIFbCrCF3OVf-SDbQRWA&utm_campaign=yelp_api&utm_medium=api_v2_search&utm_source=xkxIFbCrCF3OVf-SDbQRWA
1000


## Introduction

One of the challenges with data science is dealing with large amounts of data efficiently. Suppose, for example, that you are given a list of 1,000 different Yelp restaurants, and are tasked with scraping all of their reviews and ratings. How would you approach this problem?

## Naive approach

Based on what we've covered thus far, a reasonable approach might be:

- loop through urls:
    - while there's a next page:
        - use `requests` to grab the HTML
        - run the HTML through `beautifulsoup`, process the data, etc.
        - update the `url` to point to the next page

The following demonstrates this approach:

In [6]:
def parse_page (html):
    """
    Given the HTML for a yelp page, do some processing like parsing the reviews on that page, 
    finding the next page of reviews, etc.
    
    Inputs:
        html (string): HTML of a yelp page
    Outputs:
        (list, string): list of reviews, url of next page
    """
    # simulate long-running processing
    time.sleep(0.2)    
    return scraper.parse_page(html)

def fetch_url (url):
    """
    Given a yelp business url, aggregate all the reviews for that business.
    
    Inputs:
        url (string): the yelp url
    Outputs:
        (list): all the reviews for that business
    """
    parsed_reviews = []
    
    # 2. while we still have url to parse
    while url:
        # 3. use requests to grab the HTML
        res = requests.get(url)
        html = res.text
        
        # 4,5. parse with beautifulsoup, update url
        reviews, url = parse_page(html)
        parsed_reviews += reviews
        
    return parsed_reviews
        

def get_reviews (urls):
    """
    Given a list of yelp business urls, aggregate the reviews of all the businesses.
    
    Inputs:
        urls (list): list of valid yelp business urls
    Outputs:
        (list): list of reviews
    """
    parsed_reviews = []
    
    # 1. loop through websites
    for url in urls:
        reviews = fetch_url(url)
        parsed_reviews += reviews

    return parsed_reviews

Indeed, if you were to use only the API exposed by Homework 1, this would be precisely how you would do it.

## What's the problem?

To get a rough idea of how it behaves, let's try to time how long it takes to process just 5 urls:

In [7]:
start = time.time()

results = get_reviews(urls[:5])

end = time.time()

print(len(results))
print("Took: {}s".format(end - start))

3650
Took: 548.1295688152313s


For just 5 urls, we spent something in the neighborhood of 550s, or 9 minutes. This is because `requests.get` is synchronous, meaning a second request cannot happen until the previous request has finished. Further, the way we're handling our 200ms computation (simulated by `time.sleep`) blocks our program on every request, which obviously isn't ideal.

What if data processing took more than 200ms per page? What if we had 10,000 instead of 1,000 urls? Hopefully, it's becoming clear that doing work sequentially is unviable as we begin to scale the system.

## What's the fix?

Looking back, the main culprits are:

**1.** requests are blocking, so we spend a lot of time waiting  
**2.** requesting and processing happen sequentially, so we're not parallelizing work at all

We spend a lot of time waiting, when there's little reason to do the work in-order: if the 100th request responds before the 1st, we could begin processing the response immediately, since we don't actually need the 1st response to know how to parse the 100th. In other words, we should decouple *requesting* and *processing* data.

In the tutorial, we'll discuss two general ideas:

**1.** asynchronous IO  
**2.** multiprocessing to decouple requesting and processing

Let's walk through each in order.

## Idea 1 - Asynchronous IO

"Asynchronous request" is just a fancy name for a request that doesn't block while it waits for a response. Instead, when an asynchronous function waits for IO, your program can temporarily run some other code, and later resume execution of the function once the response arrives.

To illustrate, observe the results of sending 3 synchronous vs. asynchronous requests (don't worry about the code just yet; just note the order of the printed messages):

In [10]:
url = "https://google.com"
num_requests = 3

# synchronous example
def synchronous_requests ():
    for i in range(num_requests):
        print("Starting {}".format(i))
        res = requests.get(url)
        print("Got back {}".format(i))

# asynchronous example
async def fetch_async (session, i):
    print("Starting {}".format(i))
    async with session.get(url) as resp:
        res = await resp.read()
        print("Got back {}".format(i))
        return res

async def asynchronous_requests ():
    tasks = []
    
    async with aiohttp.ClientSession() as session:
        for i in range(num_requests):
            task = asyncio.ensure_future(fetch_async(session, i))
            tasks.append(task)
    
        return await asyncio.gather(*tasks)
    
print("Synchronous example:")
synchronous_requests()

print("Asynchronous example:")
loop = asyncio.get_event_loop()
fut = asyncio.ensure_future(asynchronous_requests())
res = loop.run_until_complete(fut)
print(len(res))

Synchronous example:
Starting 0
Got back 0
Starting 1
Got back 1
Starting 2
Got back 2
Asynchronous example:
Starting 0
Starting 1
Starting 2
Got back 0
Got back 2
Got back 1
3


Note how asynchronous requests are made back-to-back, and responses are processed in order of arrival (which might not match the order that they were sent).

What `asyncio` is doing underneath the hood is quite interesting, but in order to understand how it works, we have to first define some terms.

### Event loop

In order to run functions asynchronously, the `asyncio` library actually maintains a loop that repeatedly executes tasks in an internal queue. This is known as the event loop.

Whenever a task needs to wait for IO, it tells the scheduler to "pause" it and schedule another task in the meantime. More technically, the task "yields" execution and is taken out of the queue. Once the IO finishes, the scheduler reschedules it (puts it back in the queue) and eventually "resumes" execution of the task. At any point in time, only one task gets executed, but because each task yields instead of blocks, wait-time is minimized.

This cooperative yielding between tasks allows for execution of non-blocking IO on a single thread. Due to the nature of cooperative multitasking, even if tasks are initiated in a certain order, the order in which they are rescheduled depends on how other tasks yield, which explains the seemingly arbitrary ordering of the "Got back" messages.

### Coroutines

The aforementioned functions that "yield" execution are called coroutines (it might be helpful to think of them as "cooperative" routines). Everytime a coroutine calls `await` on some IO, it yields execution until the IO finishes, after which it can resume execution where it left off. Note that `await` can only be used in coroutines (declared with `async def`), since regular functions have no concept of yielding.

For example, one might translate `await asyncio.gather(*tasks)` as: "yield execution of this function until `asyncio.gather` resolves to a value, after which continue running this function where we left off".

### Futures

In order for a coroutine to know when IO has finished, it uses something called Futures. Futures are objects that can "resolve" to a value at a later point in time. Before a Future is resolved, it acts as a temporary placeholder for the eventual result.

In the example above, `asyncio.gather(*tasks)` returns a Future object that later resolves to the real return value of `asyncio.gather`. When we `await` this Future, we yield until it resolves to a value, and only then do we return.

### Tasks

A Task is just a subclass of Future that waits for the result of a coroutine. In other words, a Task resolves to a value only when the coroutine it wraps finally returns. In this case, `asynchronous_requests` is a coroutine which we wrap as a Task using `ensure_future` (discussed later).

### Step 1.1 - Event loop

We first define two coroutines called `fetch_async` and `asynchronous_requests` with the `async def` keywords:

In [None]:
async def fetch_async (session, i):
    # omitted for brevity
    pass
    
async def asynchronous_requests ():
    # omitted for brevity
    pass

Then, we run our `asynchronous_requests` coroutine by putting it on the event loop:

In [None]:
# get the default event loop
loop = asyncio.get_event_loop()

# make sure that we have a future-like object. anything that goes into the 
# event loop has to know how to be suspended and resolved at a later time.
fut = asyncio.ensure_future(asynchronous_requests())

# run our task until it resolves to a value (i.e the coroutine finally returns)
loop.run_until_complete(fut)

The `ensure_future` function can be a bit obscure. Essentially, it takes a Future or coroutine and ensures that a Future-like object is returned. If it receives a Future, it returns that directly; if it receives a coroutine, it wraps it in a Task (which, as you'll recall, is a Future object), schedules it on the event loop, and returns the Task.

In our case, `asynchronous_requests()` returns a coroutine, which we then schedule using `ensure_future`. By calling `loop.run_until_complete`, we run the event loop until `asynchronous_requests()` resolves to a value, i.e `await asyncio.gather(*tasks)` resolves to a value, i.e our tasks complete.

### Step 1.2 - Asynchronous HTTP

The `fetch_async` function looks confusing mainly because of syntax. It essentially waits for two Future-like objects: first, for the Future returned by `session.get` to resolve to a `response` object; then, for the Future returned by `response.read` to resolve to the actual response body.

This two-step process exists for lazy evaluation: if we didn't need the response body (e.g we only cared about the status), we could access `response.status` without having to wait for the body to be read.

The `with` keyword is just convenient syntax to make sure that the `response` object is properly closed once execution exits the `with` block.

In [None]:
async def fetch_async (session, i):
    # wait for the response object from our GET request
    async with session.get(url) as response:
        # if we just needed e.g status, could have returned response.status without waiting
        
        # yield until the body has been read, and then return the value
        return await response.read()

`asynchronous_requests` is a little more involved. First, we obtain a `ClientSession` object, which is an object through which `aiohttp` exposes its HTTP API. Don't worry much about `ClientSession` -- just know that you should reuse the same `ClientSession` object when making multiple requests (since `aiohttp` does some optimizations internally).

More interesting is the loop, which stores into a list the Future objects representing the eventual return values of our `fetch_async` calls. Note that the loop itself doesn't `await` anything, so it completes nearly instantly. Only when we `await asyncio.gather` do we "pause" the function and yield to other tasks until the Futures resolve. Indeed, all `asyncio.gather` does is return a Future object that resolves only when every task in `tasks` has resolved. (The `*tasks` syntax is just a Python way to unpack a list as individual arguments to a function.)

In [None]:
async def asynchronous_requests ():
    tasks = []
    
    # obtain the session to make HTTP requests
    async with aiohttp.ClientSession() as session:
        
        # accumulate a bunch of Future objects, one for each `fetch_async` call
        for i in range(num_requests):
            task = asyncio.ensure_future(fetch_async(session, i))
            tasks.append(task)
    
        # yield execution (i.e don't return) until all of the gathered Future objects have resolved
        return await asyncio.gather(*tasks)

You might be tempted to draw an analogy between `asyncio.gather` and a function that loops through the tasks, `await`ing and accumulating the result to a list:

In [11]:
async def asynchronous_requests_alt ():
    results = []

    async with aiohttp.ClientSession() as session:
        
        # instead of gathering them all at once, what if we wait for responses one at a time?
        for i in range(num_requests):
            page = await fetch_async(session, i)
            results.append(page)

    return results

Unlike `asyncio.gather`, this does *not* take advantage of the event loop. Calling `await` inside the loop causes the function to yield, thereby blocking subsequent iterations of the loop. So, the second request will only be sent once the first request finishes, i.e the requests run synchronously.

In [12]:
loop = asyncio.get_event_loop()
fut = asyncio.ensure_future(asynchronous_requests_alt())
loop.run_until_complete(fut)
print(len(fut.result()))

Starting 0
Got back 0
Starting 1
Got back 1
Starting 2
Got back 2
3


### Step 1.3 - Putting it together

Now that we hopefully make more sense of `asyncio`, we can adapt our `get_reviews` function to run asynchronously:

In [9]:
async def fetch_url_async (session, url, fut):
    """
    Same as fetch_url, except with async requests.
    """
    parsed_reviews = []
    while url:
        # make the request asynchronously like above
        async with session.get(url) as response:
            html = await response.read()

            # for non-async functions like parse_page, no need to await
            reviews, url = parse_page(html)
            parsed_reviews += reviews
            
    # indicate we are done by resolving the future
    fut.set_result(parsed_reviews)

async def get_reviews_async (urls):
    """
    Same as get_reviews, except with `fetch_url_async` for making async HTTP requests,
    and `asyncio.gather` for waiting for all Tasks in `tasks` to resolve.
    """
    tasks = []
    
    async with aiohttp.ClientSession() as session:
        for url in urls:
            # create a Future that will be resolved in `fetch_url_async`
            fut = asyncio.Future()
            
            # schedule `fetch_url_async`
            asyncio.ensure_future(fetch_url_async(session, url, fut))
            
            # accumulate these Futures
            tasks.append(fut)
        
        # wait for all the accumulated Futures to be resolved
        res = await asyncio.gather(*tasks)
        
        # res is a 2d list where 0th element is results from 0th url, 1st element from 1st url, etc.
        # all we're doing here is flattening the 2d list so we have one big list of results.
        return [r for url_results in res for r in url_results]

And to see if there's improvement:

In [11]:
start = time.time()

loop = asyncio.get_event_loop()
fut = asyncio.ensure_future(get_reviews_async(urls[:5]))
res = loop.run_until_complete(fut)

end = time.time()

print(len(res))
print("Took: {}s".format(end - start))

3652
Took: 411.61619210243225s


Great! We just saved 100s with minimal changes. However, we still haven't decoupled requesting and processing. If the processing step takes a while, the event loop still gets blocked and other requests can't be sent.

## Idea 2 - Multiprocessing

In `asyncio`, the event loop should never block. With cooperative multitasking, if one task decides not to yield, there's nothing stopping it from running forever. This is why CPU-bound tasks don't mix well `asyncio`: on a single thread, a CPU-intensive task will completely hog the processor. But! If we multi-thread, the OS can schedule each thread to run for a proportionate amount of time.

### GIL

Multithreading exists in Python, with a caveat: Python threads can only switch when waiting on IO. This is due to Python's [Global Interpreter Lock (GIL)](https://en.wikipedia.org/wiki/Global_interpreter_lock), which trades concurrent execution of multiple threads for efficient single-threaded processing. We won't go into it here; the main takeaway is that if your thread is doing heavy computation, it will block the event loop regardless of multithreading. 

### Step 2.1 - Multiprocessing

The workaround is to substitute threads for processes. This sidesteps the GIL because a multi-core machine can just run multiple processes in parallel, with a separate GIL in each. Luckily, `asyncio` already has a function for delegating computation to a pool of separate processes.

First, the code (only the commented lines have changed):

In [2]:
async def fetch_url_async (pool, session, url, fut):
    """
    An implementation of `fetch_url_async` that delegates long-running computation to a process pool
    """
    parsed_reviews = []
    while url:
        async with session.get(url) as response:
            html = await response.read()

            # do the heavy computation in a separate process
            reviews, url = await loop.run_in_executor(pool, parse_page, html)
            
            parsed_reviews += reviews

    fut.set_result(parsed_reviews)

async def get_reviews_async (pool, urls):
    """
    Pretty much identical to previous `get_reviews_async`, except now we pass the given process pool
    `pool` to `fetch_url_async`.
    """
    tasks = []
    
    async with aiohttp.ClientSession() as session:    
        for url in urls:
            task = asyncio.Future()
            
            # the only difference is that we pass the pool executor
            asyncio.ensure_future(fetch_url_async(pool, session, url, task))
            
            tasks.append(task)
        
        res = await asyncio.gather(*tasks)
        return [r for url_results in res for r in url_results]

Notably, we defer the heavy computation in `parse_page` to a pool of processes using `run_in_executor`. The `run_in_executor` function schedules the provided function to be run in the pool, returning a Future that resolves once the function returns. Note that `run_in_executor` works for non-async functions like `parse_page` because the function is not actually yielding to the event loop, but running in a separate process, where it can block without affecting the original process.

To create the pool, we use `ProcessPoolExecutor`. Finding a value for `num_processes` requires some intuition and trial and error, but you can always omit the argument: `ProcessPoolExecutor` will just use as many workers as you have processors on your machine.

In [7]:
start = time.time()

# initialize process pool
num_processes = 3
pool = ProcessPoolExecutor(num_processes)

# kick off the event loop
loop = asyncio.get_event_loop()
fut = asyncio.ensure_future(get_reviews_async(pool, urls[:5]))
res = loop.run_until_complete(fut)

end = time.time()

print(len(res))
print("Took {}s".format(end - start))

3652
Took 379.3171589374542s


Awesome: we've reduced 550 to 380s. If the `parse_page` process were even more CPU-intensive, and if the machine had more cores, the advantages would perhaps be even more pronounced. 

## Summary

- prefer `asyncio` when working with IO-bound tasks (waiting on network, disk, etc.)
- use multiprocessing to parallelize CPU-bound tasks

## References

**Stuff we covered**  
- [Event loops, coroutines, futures, tasks](http://masnun.com/2015/11/20/python-asyncio-future-task-and-the-event-loop.html)
- [Using coroutines with processes](https://pymotw.com/3/asyncio/executors.html)
- [Challenges with multiprocessing + asyncio](http://stackoverflow.com/questions/21159103/what-kind-of-problems-if-any-would-there-be-combining-asyncio-with-multiproces)
- [Threads vs processes vs asyncio](https://www.youtube.com/watch?v=B0Qfe3U_hKU)

**Stuff to explore (Pipelining with multiprocessing)**  
- [Brief examples of Process, Queue, and Lock in multiprocessing](http://toastdriven.com/blog/2008/nov/11/brief-introduction-multiprocessing/)
- [Basics of multiprocessing](https://pymotw.com/2/multiprocessing/basics.html)
- [Communicating between processes with Queues](https://pymotw.com/2/multiprocessing/communication.html)
- [Pipelining (and also doing async stuff with other async libraries)](https://www.youtube.com/watch?v=jq2IFUQRbGo&index=37&list=PL2k6bbM_wgju204mCEyw3bmDH62dp_sLu)

**API References**  
- [ProcessPoolExecutor](https://docs.python.org/3.2/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor)
- [Tasks and coroutines](https://docs.python.org/3/library/asyncio-task.html)
- [aiohttp ClientSession and ClientResponse](http://aiohttp.readthedocs.io/en/stable/client_reference.html)
- [ensure_future](https://github.com/python/asyncio/blob/f9b0d516fa60f9da35e87da344e365f604281ccf/asyncio/tasks.py#L548)

**Related tutorials**
- [Building a web crawler with asyncio](http://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html)
- [Building a multiplayer game in Python (Queues, Processes, asyncio)](https://7webpages.com/blog/writing-online-multiplayer-game-with-python-and-asyncio-writing-game-loop/)
- [Making a million requests with asyncio](https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html)
- [Asynchronous Python](http://ntoll.org/article/asyncio)
- [How await/async actually works](http://www.snarky.ca/how-the-heck-does-async-await-work-in-python-3-5)
