## Concurrency with coroutines

Guido:
>We entice you with a promise. It is possible to write asynchronous code that combines the efficiency of callbacks with the classic good looks of multithreaded programming. This combination is achieved with a pattern called "coroutines". Using Python 3.4's standard asyncio library, and a package called "aiohttp", fetching a URL in a coroutine is very direct:


```python
@asyncio.coroutine
def fetch(self, url):
    response = yield from self.session.get(url)
    body = yield from response.read()
```

In 3.5 its even more clear:

```python
async def fetch(self, url):
        response = await self.session.get(url)
        body = await response.read()
```

### Remembering yield from

In [21]:
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager_subgen():
    print("    |sg>starting subgen")
    total = 0.0
    count = 0 
    average = None 
    while True:
        print("    |sg>  average yielded out", average)
        term = yield average
        print("    |sg>  term sent in", term)
        if term is None:
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)

def average_my_values_delegen_simple():
    agen = averager_subgen()
    print("  |dg>created a new averager",id(agen))
    overall_av = yield from agen
    #yield from consumes all the values, like a list
    #not an individual value
    print("  |dg>now", overall_av)
    return overall_av


In [22]:
values_to_send=[1,2,3,4,5,6]

In [23]:
print("1. creating delegating generator")
delegating_gen = average_my_values_delegen_simple()
print("2. priming till yield from by sending in None")
next(delegating_gen)#priming
print("3. in loop after first yield, None sent in")
for value in values_to_send:
    print(">>sending term",value)
    out = delegating_gen.send(value)
    print('<<getting running average', out)
print("4. Sending in None to terminate")
out = delegating_gen.send(None)
print('<<getting running average', out)
print("5. DONE")

1. creating delegating generator
2. priming till yield from by sending in None
  |dg>created a new averager 4886813600
    |sg>starting subgen
    |sg>  average yielded out None
3. in loop after first yield, None sent in
>>sending term 1
    |sg>  term sent in 1
    |sg>  average yielded out 1.0
<<getting running average 1.0
>>sending term 2
    |sg>  term sent in 2
    |sg>  average yielded out 1.5
<<getting running average 1.5
>>sending term 3
    |sg>  term sent in 3
    |sg>  average yielded out 2.0
<<getting running average 2.0
>>sending term 4
    |sg>  term sent in 4
    |sg>  average yielded out 2.5
<<getting running average 2.5
>>sending term 5
    |sg>  term sent in 5
    |sg>  average yielded out 3.0
<<getting running average 3.0
>>sending term 6
    |sg>  term sent in 6
    |sg>  average yielded out 3.5
<<getting running average 3.5
4. Sending in None to terminate
    |sg>  term sent in None
  |dg>now Result(count=6, average=3.5)


StopIteration: Result(count=6, average=3.5)

### Back to the Future with co-routines

So far, in our callbacks based example, we saw to concepts rearing their heads: the event-loop, and `select` to figure which events were fired, on-which we now call a callback. By defining the callbacks in the `Fetcher` class we were able to program with state.

Now we add in some more concepts. The first one is the concept of a future which we sae earlier, the promise of a result.

In [1]:
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
import socket
selector = DefaultSelector()

The future, as you might expect is something with callbacks...

In [2]:
class MyFuture:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

In [3]:
class Fetcher:
    
    def __init__(self, url, host):
        self.url = url
        self.host = host
        self.response = b''  # Empty array of bytes.

        
    def fetch(self):
        global stopped
        sock = socket.socket()

        sock.setblocking(False)
        try:
            sock.connect((self.host, 80))
        except BlockingIOError:
            pass

        f = MyFuture()

        #resolves the future by setting a result on it
        def on_connected():
            print('on connected cb ran', flush=True)
            f.set_result(None)
        
        
        
        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        print("about to yield connection future", flush=True)
        yield f#this makes it look like fetch has returned the "future"
        #bit we have not lost the state (or have to have carried it in obj)
        #a send in will continue us here
        print('we were connected! now back in gen', flush=True)
        selector.unregister(sock.fileno())
        request = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(self.url, self.host)
        sock.send(request.encode('ascii'))
        while True:
            print("in loop")
            #now create a new future for the data-recieving call
            f = MyFuture()
            def on_response():
                chunky = sock.recv(4096)  # 4k chunk size.
                f.set_result(chunky)
            selector.register(sock.fileno(),
                              EVENT_READ,
                              on_response)
            #now to restart the gen, we will from the main
            #throw the data right back in
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                print("len(chunk)",len(chunk))
                self.response += chunk
            else:
                print("all read")
                stopped= True
                break


        
    

We need a "main" to yield to. This is the driver program. Notice that it wraps a co-routine. Its job is to set the coroutine up, and then provide a mechanism to send results into the co-routine.

In [4]:
#But when the future resolves, what resumes the generator? We need a coroutine driver. Let us call it "task":
#(this is our main)
class Task:
    def __init__(self, coro):
        self.coro = coro
        f = MyFuture()
        print(">>sending none to initial future",f)
        f.set_result(None)
        print("...stepping")
        self.step(f)
        print(">>>after priming")

    def step(self, future):
        try:
            print("sending", type(future.result))
            next_future = self.coro.send(future.result)
            print('got next future', next_future)

        except StopIteration:
            print("si")
            return None
        next_future.add_done_callback(self.step)

The task schedules a callback on this future to call `self.step` in the Task when the future is resolved..the callback will be automatically called then.

In [5]:
stopped=False
def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

In [6]:
fetcher = Fetcher('/353/', 'xkcd.com')
Task(fetcher.fetch())
stopped=False
loop()

>>sending none to initial future <__main__.MyFuture object at 0x10378f3c8>
...stepping
sending <class 'NoneType'>
about to yield connection future
got next future <__main__.MyFuture object at 0x10378f4e0>
>>>after priming
on connected cb ran
sending <class 'NoneType'>
we were connected! now back in gen
in loop
got next future <__main__.MyFuture object at 0x10378f438>
sending <class 'bytes'>
len(chunk) 1384
in loop
got next future <__main__.MyFuture object at 0x10378f4a8>
sending <class 'bytes'>
len(chunk) 4096
in loop
got next future <__main__.MyFuture object at 0x10378f518>
sending <class 'bytes'>
len(chunk) 56
in loop
got next future <__main__.MyFuture object at 0x10378f550>
sending <class 'bytes'>
len(chunk) 2551
in loop
got next future <__main__.MyFuture object at 0x10378f588>
sending <class 'bytes'>
all read
si


#### What happened?

1. In coroutine `fetch`, you yielded a future and paused. Control flow went all the way to the task, or driver object
2. Meanwhile, in the select, the callback `on_response`, for example, `set_result` on this future, which ran the future-done callback
3. This future-done callback, added in the `step` method of the `Task`, is the `step` method itself. Here the set result is `send` into the coroutine, where it accumulates into the response.
4. we move until the next yield and repeat

It is key that the things yielded are futures. Because in the transfer of control sense it feels like something has returned.

### Refactoring using generators

In [7]:
#But when the future resolves, what resumes the generator? We need a coroutine driver. Let us call it "task":
#(this is our main)
class Task:
    def __init__(self, coro):
        self.coro = coro
        f = MyFuture()
        print(">>sending none to initial future",f)
        f.set_result(None)
        print("...stepping")
        self.step(f)
        print(">>>after priming")

    def step(self, future):
        try:
            print("sending", type(future.result))
            next_future = self.coro.send(future.result)
            print('got next future', next_future)

        except StopIteration:
            print("si")
            return None
        next_future.add_done_callback(self.step)

`Task`, our driver, is unchanged. Meanwhile, below, we refactored the reading of a single chunk into a `read` coroutine. This will be our innermost coroutine..our subgen. It registers the callback and yields, then when the driver task sends the data back in, it returns the chunk.

In [8]:
def read(sock):
    f = MyFuture()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

Now multiple chunks are read using the `read_all` delegating coroutine. Each `yield aa` waits forpossibly multiple yields in the subgen to finish (which is not the case here). The subgen `read` returns after one read, and the chunk is set to that return value. Finally, a set of bytes is sent back. 

One might wonder where the concurrency is, and why we need futures at all. After all the chunks need to come in order. But using the futures lets us potentially have other `read`s for other urls going on at the same time, as we might need in a crawler.

In [9]:
def read_all(sock):
    global stopped
    response = []
    # Read whole response. Arranging reads like this
    #guarantees their sequentiality
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    stopped=True
    return b''.join(response)

Guido:
>If you squint and make the yield from statements disappear it looks like  conventional functions doing blocking I/O. But in fact, read and read_all are coroutines. Yielding from read pauses read_all until the I/O completes. While read_all is paused, asyncio's event loop does other work and awaits other I/O events; read_all is resumed with the result of read on the next loop tick once its event is ready.

In [13]:
class Fetcher:
    
    def __init__(self, host, url):
        self.url = url
        self.host = host
        self.response = b''  # Empty array of bytes.

        
    def fetch(self):
        global stopped
        sock = socket.socket()

        sock.setblocking(False)
        try:
            sock.connect((self.host, 80))
        except BlockingIOError:
            pass

        f = MyFuture()

        def on_connected():
            print('on connected cb ran')
            f.set_result(None)
        
        
        
        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        print("about to yield connection future")
        yield f
        print('connected!')
        selector.unregister(sock.fileno())
        request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
        sock.send(request.encode('ascii'))
        yield from read_all(sock)

Above, now `fetch` acts as a delegating generator for the response part, waiting until everything is read and returned from `read_all`.

In [14]:
fetcher = Fetcher('xkcd.com','/353/')
Task(fetcher.fetch())
stopped = False
loop()

>>sending none to initial future <__main__.MyFuture object at 0x10379e860>
...stepping
sending <class 'NoneType'>
about to yield connection future
got next future <__main__.MyFuture object at 0x10379e128>
>>>after priming
on connected cb ran
sending <class 'NoneType'>
connected!
got next future <__main__.MyFuture object at 0x10379e1d0>
sending <class 'bytes'>
got next future <__main__.MyFuture object at 0x10379e160>
sending <class 'bytes'>
got next future <__main__.MyFuture object at 0x10379e208>
sending <class 'bytes'>
si


![](http://aosabook.org/en/500L/crawler-images/yield-from.png)

There is one yield left amongst the yield froms. For consistency, this can be fixed...it also lets us change implementations under the hood..

In [15]:
def read(sock):
    f = MyFuture()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

By making the future an iterator we can `yield from` it instead of `yield`ing it. Notice that we are careful to return the result. The `yield` transfers control flow to the task, then again the callback on the select sets the result, which sends data in which can now simply be retirned to the calling `yield all` using self.result

In [16]:
class MyFuture:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        print("cblist", self._callbacks)
        for fn in self._callbacks:
            fn(self)
            
    def __iter__(self):
        yield self
        return self.result

In [17]:
class Fetcher:
    
    def __init__(self, host, url):
        self.url = url
        self.host = host
        self.response = b''  # Empty array of bytes.

        
    def fetch(self):
        global stopped
        sock = socket.socket()

        sock.setblocking(False)
        try:
            sock.connect((self.host, 80))
        except BlockingIOError:
            pass

        f = MyFuture()

        def on_connected():
            print('on connected cb ran')
            f.set_result(None)
        
        
        
        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        print("about to yield connection future")
        yield from f
        print('connected!')
        selector.unregister(sock.fileno())
        request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
        sock.send(request.encode('ascii'))
        self.response = (yield from read_all(sock))
        return self.response

In [19]:
fetcher = Fetcher('xkcd.com', '/353/')
Task(fetcher.fetch())
stopped = False
loop()

>>sending none to initial future <__main__.MyFuture object at 0x1052840b8>
cblist []
...stepping
sending <class 'NoneType'>
about to yield connection future
got next future <__main__.MyFuture object at 0x103994400>
>>>after priming
on connected cb ran
cblist [<bound method Task.step of <__main__.Task object at 0x105284128>>]
sending <class 'NoneType'>
connected!
got next future <__main__.MyFuture object at 0x103994160>
cblist [<bound method Task.step of <__main__.Task object at 0x105284128>>]
sending <class 'bytes'>
got next future <__main__.MyFuture object at 0x103994710>
cblist [<bound method Task.step of <__main__.Task object at 0x105284128>>]
sending <class 'bytes'>
got next future <__main__.MyFuture object at 0x103994128>
cblist [<bound method Task.step of <__main__.Task object at 0x105284128>>]
sending <class 'bytes'>
got next future <__main__.MyFuture object at 0x103994160>
cblist [<bound method Task.step of <__main__.Task object at 0x105284128>>]
sending <class 'bytes'>
got nex

Lets make some remarks about what we saw:

- because we now chained generators using `yield from`, control flows from the innermost subgen to the outermost piece of calling code, here the event loop, after EACH iteration of the innermost subgen.
- loops in general are more complex rather than just handling IO via callbacks. Indeed the loop maintains a `call_soon` queue. The priming, etc we see in the Task initializer can  be scheduled for the next `tick` of the loop by putting it on the queue. 
- when the future completes, the callback scheduled by the task is the one for the outermost co-routine in the coroutine chain. This callback is the one defined by the task and takes care of sending in the result
- when the innermost subgen finishes, whatever it returns becomes the value of the enclosing yield from, which continues execution with this value. And so on until the outermost coroutine completes.

`and returns the value that becomes the value of the Future that is the Task.`

From http://www.bitdance.com/blog/2014/09/30_01_asycio_overview/ :
>To summarise at a slightly higher level, the overall flow in an asyncio program is that we execute procedural style code, and every time we get to a yield from statement the execution of that procedural code is suspended. This may go on for several levels of yield from call, but eventually a Future will be yielded and make its way back up to the Task, and we will start a new pass through the EventLoop. The EventLoop will then run any call_soon callbacks. When all call_soon callbacks have run, the EventLoop uses a selector to wait for the next IO event or the next callback that was scheduled to run at a specific time. Those IO or timed events will provide values that will be set on certain Future objects, which will trigger the scheduling of call_soon callbacks which will in turn cause the corouties that were waiting for those Futures to be scheduled via call_soon to have next called on them and thus get another chance to run. This continues until all Futures are complete, including the Task or Tasks that the main EventLoop is waiting for (or the EventLoop is explicitly shut down).

### Sequential downloading

Server at:

https://dl.dropboxusercontent.com/u/75194/fls/web.py

Call as :

`python -m aiohttp.web -H localhost -P 8000 fls.web:init_function`



In [1]:
import requests
import time
import sys
def get_url(url):
    response = requests.get(url)
    print("1", url)
    return response.content

def get_many(urls):
    cdict={}
    for url in urls:
        cdict[url] = get_url(url)
    return cdict
        
def download(download_func, urls):
    start = time.time()
    cdict = download_func(urls)
    elapsed = time.time() - start
    print("{} in {} secs".format(len(cdict), elapsed))
    

In [2]:
trial_urls=['http://localhost:8000/{}'.format(n) for n in range(50)]

In [3]:
download(get_many, trial_urls)

1 http://localhost:8000/0
1 http://localhost:8000/1
1 http://localhost:8000/2
1 http://localhost:8000/3
1 http://localhost:8000/4
1 http://localhost:8000/5
1 http://localhost:8000/6
1 http://localhost:8000/7
1 http://localhost:8000/8
1 http://localhost:8000/9
1 http://localhost:8000/10
1 http://localhost:8000/11
1 http://localhost:8000/12
1 http://localhost:8000/13
1 http://localhost:8000/14
1 http://localhost:8000/15
1 http://localhost:8000/16
1 http://localhost:8000/17
1 http://localhost:8000/18
1 http://localhost:8000/19
1 http://localhost:8000/20
1 http://localhost:8000/21
1 http://localhost:8000/22
1 http://localhost:8000/23
1 http://localhost:8000/24
1 http://localhost:8000/25
1 http://localhost:8000/26
1 http://localhost:8000/27
1 http://localhost:8000/28
1 http://localhost:8000/29
1 http://localhost:8000/30
1 http://localhost:8000/31
1 http://localhost:8000/32
1 http://localhost:8000/33
1 http://localhost:8000/34
1 http://localhost:8000/35
1 http://localhost:8000/36
1 http://lo

### Sequential using async tech

Here we use the `aiohttp` library (`pip install aiohttp`). Note that the multiple `yield from`s in `get_many_async` will sequentialize so this is not a true async program.

But it illustrates a key thing about async programming. Write the code as if you were writing a blocking program and then stick in `yield from`s.

The critical thing different from our illustrative effort above is the event loop from asyncio. This event loop will handle selecting on IO as well as timers, the scheduling of co-routines, and many other things. The `loop.run_until_complete()` function blocks until the top level coroutine, here `get_many async` completes. So this sets up the entire flow we were talking about earlier.

In [4]:
import asyncio, aiohttp

@asyncio.coroutine
def get_url_async(url):
    #async operation
    response = yield from aiohttp.request('GET', url)
    print("1", url)
    #reading the response is a separate async op
    content = yield from response.read()
    #content=""
    return content
    
@asyncio.coroutine
def get_many_async(urls):
    cdict={}
    for url in urls:
        #print("by url", url)
        cdict[url] = yield from get_url_async(url)
        #print("ay url", url)
    return cdict
        
def download_many(urls):
    loop = asyncio.get_event_loop()
    coro = get_many_async(urls) 
    cdict = loop.run_until_complete(coro)
    return cdict

download(download_many, trial_urls)

1 http://localhost:8000/0
1 http://localhost:8000/1
1 http://localhost:8000/2
1 http://localhost:8000/3
1 http://localhost:8000/4
1 http://localhost:8000/5
1 http://localhost:8000/6
1 http://localhost:8000/7
1 http://localhost:8000/8
1 http://localhost:8000/9
1 http://localhost:8000/10
1 http://localhost:8000/11
1 http://localhost:8000/12
1 http://localhost:8000/13
1 http://localhost:8000/14
1 http://localhost:8000/15
1 http://localhost:8000/16
1 http://localhost:8000/17
1 http://localhost:8000/18
1 http://localhost:8000/19
1 http://localhost:8000/20
1 http://localhost:8000/21
1 http://localhost:8000/22
1 http://localhost:8000/23
1 http://localhost:8000/24
1 http://localhost:8000/25
1 http://localhost:8000/26
1 http://localhost:8000/27
1 http://localhost:8000/28
1 http://localhost:8000/29
1 http://localhost:8000/30
1 http://localhost:8000/31
1 http://localhost:8000/32
1 http://localhost:8000/33
1 http://localhost:8000/34
1 http://localhost:8000/35
1 http://localhost:8000/36
1 http://lo

### Totally asynchronous

All we need to do to make this synchronous looking async code from above truly async is to have all the `yield from`s in the top level coroutine replaced by futures. This is the key use of futures, they convert synchronous code into async.

`asyncio.as_completed(coroutines)` converts the coroutine instances into futures and now returns them. These futures are defined by the asyncio library not our implementation above).


In [20]:
import asyncio, aiohttp, time


def download(download_func, urls):
    start = time.time()
    cdict = download_func(urls)
    elapsed = time.time() - start
    print("{} in {} secs".format(len(cdict), elapsed))
    
@asyncio.coroutine
def get_url_async(url):
    #async operation
    response = yield from aiohttp.request('GET', url)
    print("1", url)
    #reading the response is a separate async op
    content = yield from response.read()
    return content
    

@asyncio.coroutine
def get_many_async(urls):
    todos = [get_url_async(url) for url in urls]
    results=[]
    for future in asyncio.as_completed(todos):
        results.append((yield from future))
    return results
        
def download_many(urls):
    loop = asyncio.get_event_loop()
    coro = get_many_async(urls) 
    clist = loop.run_until_complete(coro)
    return clist

download(download_many, trial_urls)

1 http://localhost:8000/18
1 http://localhost:8000/26
1 http://localhost:8000/37
1 http://localhost:8000/43
1 http://localhost:8000/14
1 http://localhost:8000/25
1 http://localhost:8000/46
1 http://localhost:8000/42
1 http://localhost:8000/13
1 http://localhost:8000/0
1 http://localhost:8000/30
1 http://localhost:8000/27
1 http://localhost:8000/7
1 http://localhost:8000/9
1 http://localhost:8000/44
1 http://localhost:8000/28
1 http://localhost:8000/36
1 http://localhost:8000/31
1 http://localhost:8000/12
1 http://localhost:8000/23
1 http://localhost:8000/15
1 http://localhost:8000/32
1 http://localhost:8000/4
1 http://localhost:8000/19
1 http://localhost:8000/17
1 http://localhost:8000/47
1 http://localhost:8000/21
1 http://localhost:8000/16
1 http://localhost:8000/39
1 http://localhost:8000/20
1 http://localhost:8000/3
1 http://localhost:8000/24
1 http://localhost:8000/10
1 http://localhost:8000/48
1 http://localhost:8000/29
1 http://localhost:8000/41
1 http://localhost:8000/1
1 http:

Just like we yielded from futures (in adition to yielding from read_all) above, we do the same in `asyncio`. Indeed (Fluent):

>Using yield from with a future automatically takes care of waiting for it to finish, without blocking the event loop—because in asyncio, yield from is used to give control back to the event loop.
Note that using yield from with a future is the coroutine equivalent of the function‐ ality offered by add_done_callback: instead of triggering a callback, when the delayed operation is done, the event loop sets the result of the future, and the yield from expression produces a return value inside our suspended coroutine, allowing it to resume.

In asyncio (and in our example) you can `yield from` either a future or a coroutine. In both cases the resolution of the future advances us past the `yield from` by performing a `send`. But who does this?

In our case it was the `Task` class, and we wrapped the `fetcherinstance.fetch` coroutine in it. Indeed this is precisely what happens with asyncio. From Fluent:

>In order to execute, a coroutine must be scheduled, and then it’s wrapped in an asyncio.Task. Given a coroutine, there are two main ways of obtaining a Task:
- `asyncio.async(coro_or_future, *, loop=None)`
This function unifies coroutines and futures: the first argument can be either one. If it’s a Future or Task, it’s returned unchanged. If it’s a coroutine, async calls loop.create_task(...) on it to create a Task.
- `BaseEventLoop.create_task(coro)`
This method schedules the coroutine for execution and returns an asyncio.Task object. If called on a custom subclass of BaseEventLoop, the object returned may be an instance of some other Task-compatible class provided by an external library (e.g., Tornado).

Several asyncio functions accept coroutines and wrap them in asyncio.Task objects automatically, using asyncio.async internally. One example is `BaseEventLoop.run_until_complete(...)`.

Turns out that a `Task` IS a `Future`. This makes sense, as its being waited on by the event loop to complete. Indeed, from the docs:
>A task is responsible for executing a coroutine object in an event loop. If the wrapped coroutine yields from a future, the task suspends the execution of the wrapped coroutine and waits for the completition of the future. When the future is done, the execution of the wrapped coroutine restarts with the result or the exception of the future.

>Event loops use cooperative scheduling: an event loop only runs one task at a time. Other tasks may run in parallel if other event loops are running in different threads. While a task waits for the completion of a future, the event loop executes a new task.

>The loop.run_until_complete function accepts a future or a coroutine. If it gets a coroutine, run_until_complete wraps it into a Task, similar to what wait does. Coroutines, futures, and tasks can all be driven by yield from, and this is what run_until_complete does...

Notice something interesting, something you have seen before..

- A coroutine chain must be ultimately driven by a caller which is not a coroutine: in this case the event loop (asyncio has the loop do the send)
- the innermost subgenerator must be a simple generator/yield/iterable, or in the asyncio scenario we use something like `aiohttp.request` or `asyncio.sleep`. These dont have a `yield from` in them

### Threads, for comparision

In [6]:
import concurrent
MAX_WORKERS = 20
def get_many_threaded(urls):
    workers = min(MAX_WORKERS, len(urls))
    with concurrent.futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(get_url, urls)
    return dict(zip(urls, res))
download(get_many_threaded, trial_urls)

1 http://localhost:8000/14
1 http://localhost:8000/13
1 http://localhost:8000/7
1 http://localhost:8000/2
1 http://localhost:8000/5
1 http://localhost:8000/15
1 http://localhost:8000/10
1 http://localhost:8000/4
1 http://localhost:8000/3
1 http://localhost:8000/8
1 http://localhost:8000/12
1 http://localhost:8000/18
1 http://localhost:8000/17
1 http://localhost:8000/1
1 http://localhost:8000/9
1 http://localhost:8000/16
1 http://localhost:8000/11
1 http://localhost:8000/6
1 http://localhost:8000/0
1 http://localhost:8000/19
1 http://localhost:8000/34
1 http://localhost:8000/21
1 http://localhost:8000/22
1 http://localhost:8000/20
1 http://localhost:8000/28
1 http://localhost:8000/24
1 http://localhost:8000/23
1 http://localhost:8000/36
1 http://localhost:8000/25
1 http://localhost:8000/39
1 http://localhost:8000/29
1 http://localhost:8000/30
1 http://localhost:8000/26
1 http://localhost:8000/27
1 http://localhost:8000/32
1 http://localhost:8000/33
1 http://localhost:8000/38
1 http://lo

Why not use the threaded version. I'll quote from Fluent:
>One final point related to threads versus coroutines: if you’ve done any nontrivial programming with threads, you know how challenging it is to reason about the pro‐ gram because the scheduler can interrupt a thread at any time. You must remember to hold locks to protect the critical sections of your program, to avoid getting inter‐ rupted in the middle of a multistep operation—which could leave data in an invalid state.
With coroutines, everything is protected against interruption by default. You must explicitly yield to let the rest of the program run. Instead of holding locks to syn‐ chronize the operations of multiple threads, you have coroutines that are “synchron‐ ized” by definition: only one of them is running at any time. And when you want to give up control, you use yield or yield from to give control back to the scheduler. That’s why it is possible to safely cancel a coroutine: by definition, a coroutine can only be cancelled when it’s suspended at a yield point, so you can perform cleanup by handling the CancelledError exception.

### File IO and blocking calls

Callbacks in js and python in the single-threaded situation work because, either at userspace or OS level, we are replying on iterrupts, threads, polling, multiprocess etc to make sure stuff gets done. But our thread does not get blocked.

This is also true for co-routines will co-operatively multitask.

An example os what might happen if we encounter blocking code is below, since OS's lack async fileops. In a threaded version, the file-io would release the GIL and other threads would progress. But, in our case here with coroutines, we would freeze. To avoid this, we use `run_in_executor`, which actually uses a thread-pool executor. 

In [10]:
import asyncio, aiohttp, time


def download(download_func, urls):
    start = time.time()
    cdict = download_func(urls)
    elapsed = time.time() - start
    print("{} in {} secs".format(len(cdict), elapsed))
    
@asyncio.coroutine
def get_url_async(url):
    #async operation
    response = yield from aiohttp.request('GET', url)
    print("1", url)
    #reading the response is a separate async op
    content = yield from response.read()
    return content
    
import uuid
def save_file(content):
    fname=str(uuid.uuid4())
    with open("/tmp/dload-"+fname+'.html', 'w') as fd:
        fd.write(str(content))
    return fname

@asyncio.coroutine
def download_and_save_one(url):
    c = yield from get_url_async(url)
    loop = asyncio.get_event_loop() 
    loop.run_in_executor(None,
                save_file, c)
    return c

    
@asyncio.coroutine
def get_many_async(urls):
    todos = [download_and_save_one(url) for url in urls]
    results=[]
    for future in asyncio.as_completed(todos):
        results.append((yield from future))
    return results
        
def download_many(urls):
    loop = asyncio.get_event_loop()
    coro = get_many_async(urls) 
    clist = loop.run_until_complete(coro)
    return clist

download(download_many, trial_urls)

1 http://localhost:8000/30
1 http://localhost:8000/8
1 http://localhost:8000/47
1 http://localhost:8000/5
1 http://localhost:8000/14
1 http://localhost:8000/2
1 http://localhost:8000/29
1 http://localhost:8000/20
1 http://localhost:8000/11
1 http://localhost:8000/45
1 http://localhost:8000/6
1 http://localhost:8000/21
1 http://localhost:8000/33
1 http://localhost:8000/32
1 http://localhost:8000/17
1 http://localhost:8000/22
1 http://localhost:8000/44
1 http://localhost:8000/3
1 http://localhost:8000/48
1 http://localhost:8000/12
1 http://localhost:8000/23
1 http://localhost:8000/35
1 http://localhost:8000/4
1 http://localhost:8000/27
1 http://localhost:8000/39
1 http://localhost:8000/1
1 http://localhost:8000/41
1 http://localhost:8000/10
1 http://localhost:8000/7
1 http://localhost:8000/38
1 http://localhost:8000/15
1 http://localhost:8000/31
1 http://localhost:8000/25
1 http://localhost:8000/49
1 http://localhost:8000/16
1 http://localhost:8000/13
1 http://localhost:8000/24
1 http://

In [12]:
! ls /tmp

[35m569fc150184ba[m[m                                   dload-6508ca3a-3cc7-498f-923a-1bde3d3b4f4a.html
[35m569fc1501cc12[m[m                                   dload-66ba2ce1-60a1-4325-9a22-9bf5ba0f5170.html
[35m56d64375dbf85[m[m                                   dload-683aeb05-92bb-492a-8da6-ddf9c7694512.html
[1m[36mAtom Crashes[m[m                                    dload-68566095-e704-40b6-9b76-ad850b052d89.html
[33mB92A7692-2496-452C-AC4C-1DAD18694C9B_IN[m[m         dload-72bc18eb-5959-4bbf-901e-96ac99389b9c.html
[33mB92A7692-2496-452C-AC4C-1DAD18694C9B_OUT[m[m        dload-74265dd9-6d8e-4ed2-8080-9dff77883403.html
dload-046ac6e1-9cea-417e-b85a-8d2fd0d1c972.html dload-7f7321dd-6b60-43c7-8707-c66e428baf46.html
dload-0493bd09-b591-4334-bf2e-a323387b46bb.html dload-95c36af8-8ef7-4ab4-bf27-1f49552830eb.html
dload-05480f0c-e08b-4292-aabf-bbf8a7aa3fc7.html dload-98d9e181-6876-468a-ab57-e146a8e78374.html
dload-09af794e-221a-4f9d-8e77-6ecaeedcd4db.html dload-99b

In [13]:
!rm /tmp/dload-*.html; ls /tmp

[35m569fc150184ba[m[m                            [1m[36mlaunch-NAkmmJ[m[m
[35m569fc1501cc12[m[m                            [1m[36mlaunch-YzQTyI[m[m
[35m56d64375dbf85[m[m                            [1m[36mlaunchd-157.m3nlro[m[m
[1m[36mAtom Crashes[m[m                             [1m[36mlaunchd-3256.sO9Kik[m[m
[33mB92A7692-2496-452C-AC4C-1DAD18694C9B_IN[m[m  [1m[36mlaunchd-42570.yMUzAO[m[m
[33mB92A7692-2496-452C-AC4C-1DAD18694C9B_OUT[m[m [1m[36mlaunchd-827.5LXWL9[m[m
[1m[36mlaunch-I73gqi[m[m                            [1m[36mstupidlang[m[m


### Asyncio in 3.5

Everything we have done so far continues to work. but you can use `await` instead of `yield from`, and `async def` finally provides syntax that labels coroutines.

In [17]:

import asyncio
import collections
from contextlib import closing

import aiohttp
from aiohttp import web


# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000


class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code


async def get_url_async(url):
    with closing(await aiohttp.request('GET', url)) as resp:
        if resp.status == 200:
            print("1", url)
            content = await resp.read()
            return content
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)

def save_file(content):
    fname=str(uuid.uuid4())
    with open("/tmp/dload-"+fname+'.html', 'w') as fd:
        fd.write(str(content))
    return fname

async def download_and_save_one(url): 
    try:
        c = await get_url_async(url)
    except web.HTTPNotFound:  # <6>
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError() from exc
    else:
        loop = asyncio.get_event_loop() 
        loop.run_in_executor(None,
                save_file, c)
    return c

async def get_many_async(urls):
    to_do = [download_and_save_one(url) for url in urls]

    to_do_iter = asyncio.as_completed(to_do)
    results=[]
    for future in to_do_iter:
        try:
            res = await future
        except FetchError as exc:
            status = HTTPStatus.error
        else:
            results.append(res)


    return results


def download_many(urls):
    loop = asyncio.get_event_loop()
    coro = get_many_async(urls)
    clist = loop.run_until_complete(coro)
    return clist

download(download_many, trial_urls)

1 http://localhost:8000/28
1 http://localhost:8000/36
1 http://localhost:8000/42
1 http://localhost:8000/24
1 http://localhost:8000/4
1 http://localhost:8000/35
1 http://localhost:8000/23
1 http://localhost:8000/39
1 http://localhost:8000/43
1 http://localhost:8000/37
1 http://localhost:8000/40
1 http://localhost:8000/17
1 http://localhost:8000/15
1 http://localhost:8000/10
1 http://localhost:8000/8
1 http://localhost:8000/45
1 http://localhost:8000/6
1 http://localhost:8000/3
1 http://localhost:8000/11
1 http://localhost:8000/22
1 http://localhost:8000/7
1 http://localhost:8000/46
1 http://localhost:8000/33
1 http://localhost:8000/2
1 http://localhost:8000/25
1 http://localhost:8000/38
1 http://localhost:8000/47
1 http://localhost:8000/14
1 http://localhost:8000/29
1 http://localhost:8000/9
1 http://localhost:8000/27
1 http://localhost:8000/48
1 http://localhost:8000/12
1 http://localhost:8000/13
1 http://localhost:8000/26
1 http://localhost:8000/31
1 http://localhost:8000/49
1 http:/

In [18]:
!rm /tmp/dload-*.html; ls /tmp

[35m569fc150184ba[m[m                            [1m[36mlaunch-NAkmmJ[m[m
[35m569fc1501cc12[m[m                            [1m[36mlaunch-YzQTyI[m[m
[35m56d64375dbf85[m[m                            [1m[36mlaunchd-157.m3nlro[m[m
[1m[36mAtom Crashes[m[m                             [1m[36mlaunchd-3256.sO9Kik[m[m
[33mB92A7692-2496-452C-AC4C-1DAD18694C9B_IN[m[m  [1m[36mlaunchd-42570.yMUzAO[m[m
[33mB92A7692-2496-452C-AC4C-1DAD18694C9B_OUT[m[m [1m[36mlaunchd-827.5LXWL9[m[m
[1m[36mlaunch-I73gqi[m[m                            [1m[36mstupidlang[m[m
