<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Chapter-17.-Concurrency-with-Futures" data-toc-modified-id="Chapter-17.-Concurrency-with-Futures-1">Chapter 17. Concurrency with Futures</a></span><ul class="toc-item"><li><span><a href="#A-Sequential-Download-Script" data-toc-modified-id="A-Sequential-Download-Script-1.1">A Sequential Download Script</a></span><ul class="toc-item"><li><span><a href="#Downloading-with-concurrent.futures" data-toc-modified-id="Downloading-with-concurrent.futures-1.1.1">Downloading with concurrent.futures</a></span></li><li><span><a href="#replacing-executor.map-with-executor.submit-and-futures.as_completed-in-the-download_many-function" data-toc-modified-id="replacing-executor.map-with-executor.submit-and-futures.as_completed-in-the-download_many-function-1.1.2">replacing executor.map with executor.submit and futures.as_completed in the download_many function</a></span></li><li><span><a href="#Downloading-with-asyncio-and-aiohttp" data-toc-modified-id="Downloading-with-asyncio-and-aiohttp-1.1.3">Downloading with asyncio and aiohttp</a></span></li></ul></li><li><span><a href="#The-GIL" data-toc-modified-id="The-GIL-1.2">The GIL</a></span><ul class="toc-item"><li><span><a href="#The-GIL-for-I/O-bound-processing" data-toc-modified-id="The-GIL-for-I/O-bound-processing-1.2.1">The GIL for I/O bound processing</a></span></li><li><span><a href="#The-GIL-for-CPU-bound-processing" data-toc-modified-id="The-GIL-for-CPU-bound-processing-1.2.2">The GIL for CPU bound processing</a></span></li><li><span><a href="#Simple-demonstration-of-the-map-method-of-ThreadPoolExecutor" data-toc-modified-id="Simple-demonstration-of-the-map-method-of-ThreadPoolExecutor-1.2.3">Simple demonstration of the map method of ThreadPoolExecutor</a></span></li></ul></li></ul></li><li><span><a href="#Chapter-18.-Concurrency-with-asyncio" data-toc-modified-id="Chapter-18.-Concurrency-with-asyncio-2">Chapter 18. Concurrency with asyncio</a></span><ul class="toc-item"><li><span><a href="#Spinner-thread-using-threading-module" data-toc-modified-id="Spinner-thread-using-threading-module-2.1">Spinner thread using threading module</a></span></li><li><span><a href="#Spinner-thread-using-asyncio" data-toc-modified-id="Spinner-thread-using-asyncio-2.2">Spinner thread using asyncio</a></span></li><li><span><a href="#Main-differences-between-Thread-and-Task" data-toc-modified-id="Main-differences-between-Thread-and-Task-2.3">Main differences between Thread and Task</a></span><ul class="toc-item"><li><span><a href="#How-the-asyncio.Future-class-differs-from-the-concurrent.futures.Future-class" data-toc-modified-id="How-the-asyncio.Future-class-differs-from-the-concurrent.futures.Future-class-2.3.1">How the asyncio.Future class differs from the concurrent.futures.Future class</a></span></li></ul></li></ul></li></ul></div>

# Chapter 17. Concurrency with Futures

## A Sequential Download Script

In [4]:
import os
import time
import sys

import requests   

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()   

BASE_URL = 'http://flupy.org/data/flags'   

DEST_DIR = 'downloads/'   


def save_flag(img, filename):   
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def get_flag(cc):   
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


def show(text):   
#     print(text, end=' ')
    sys.stdout.flush()


def download_many(cc_list):   
    for cc in sorted(cc_list):   
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')

    return len(cc_list)

    
def main(download_many):   
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many)   



20 flags downloaded in 1.24s


### Downloading with concurrent.futures
An important thing to know about futures in general is that you and I should not create them: they are meant to be instantiated exclusively by the concurrency framework, be it concurrent.futures or asyncio. 

In [235]:
from concurrent import futures

# Maximum number of threads to be used in the ThreadPoolExecutor
MAX_WORKERS = 20   


def download_one(cc):   
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))   
    with futures.ThreadPoolExecutor(workers) as executor:   
        res = executor.map(download_one, sorted(cc_list))   

    return len(list(res))   


main(download_many)   


20 flags downloaded in 0.10s


### replacing executor.map with executor.submit and futures.as_completed in the download_many function

In [236]:

def download_many(cc_list):
#     cc_list = cc_list[:5]   
    with futures.ThreadPoolExecutor(max_workers=3) as executor:   
        to_do = []
        for cc in sorted(cc_list):
# Executor.submit() method takes a callable, schedules it to run, 
# and returns a future.
            future = executor.submit(download_one, cc)   
            to_do.append(future)   
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))   

        results = []
# concurrent.futures.as_completed function, which takes an 
# iterable of futures and returns an iterator that yields 
# futures as they are done
        for future in futures.as_completed(to_do):   
            res = future.result()   
            msg = '{} result: {!r}'
            print(msg.format(future, res))  
            results.append(res)

    return len(results)

main(download_many)   

Scheduled for BD: <Future at 0x7f9fcb3712b0 state=running>
Scheduled for BR: <Future at 0x7f9fcb38c5e0 state=running>
Scheduled for CD: <Future at 0x7f9fcb38c820 state=running>
Scheduled for CN: <Future at 0x7f9fcb38cfd0 state=pending>
Scheduled for DE: <Future at 0x7f9fcb37a190 state=pending>
Scheduled for EG: <Future at 0x7f9fcb37acd0 state=pending>
Scheduled for ET: <Future at 0x7f9fcb37afa0 state=pending>
Scheduled for FR: <Future at 0x7f9fcb37a070 state=pending>
Scheduled for ID: <Future at 0x7f9fcb352760 state=pending>
Scheduled for IN: <Future at 0x7f9fcb352280 state=pending>
Scheduled for IR: <Future at 0x7f9fcb352250 state=pending>
Scheduled for JP: <Future at 0x7f9fcb352580 state=pending>
Scheduled for MX: <Future at 0x7f9fcb352b50 state=pending>
Scheduled for NG: <Future at 0x7f9fcb352910 state=pending>
Scheduled for PH: <Future at 0x7f9fcb38c4c0 state=pending>
Scheduled for PK: <Future at 0x7f9fcb371460 state=pending>
Scheduled for RU: <Future at 0x7f9fcb37ae20 state=pendin

### Downloading with asyncio and aiohttp

In [237]:
import asyncio

import aiohttp   


@asyncio.coroutine   
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)   
    image = yield from resp.read()   
    return image


@asyncio.coroutine
def download_one(cc):   
    image = yield from get_flag(cc)   
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    loop = asyncio.get_event_loop()   
    to_do = [download_one(cc) for cc in sorted(cc_list)]   
    wait_coro = asyncio.wait(to_do)   
    res, _ = loop.run_until_complete(wait_coro)   
    loop.close()  

    return len(res)


if __name__ == '__main__':
    main(download_many)

  def get_flag(cc):
  def download_one(cc):


RuntimeError: This event loop is already running

  resp = yield from aiohttp.request('GET', url)
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fcb2eeb20>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fcb2eeb20>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fc8745a60>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fcaa69790>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fc9904280>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fc8b51160>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fc9904280>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fcab18b50>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fc8b51160>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f9fc8

## The GIL
The CPython interpreter is not thread-safe internally, so it has a Global Interpreter Lock (GIL), which allows only one thread at a time to execute Python bytecodes. That’s why a single Python process usually cannot use multiple CPU cores at the same time.

### The GIL for I/O bound processing
The GIL is nearly harmless with I/O-bound processing. All standard library functions that perform blocking I/O release the GIL when waiting for a result from the OS. This means Python programs that are I/O bound can benefit from using threads at the Python level. While one Python thread is waiting for a response from the network, the blocked I/O function releases the GIL so another thread can run.

### The GIL for CPU bound processing

For CPU-bound work, you need to sidestep the GIL by launching multiple processes. The futures.ProcessPoolExecutor is the easiest way to do it. 

The multiprocessing package emulates the threading API but delegates jobs to multiple processes. Multiprocessing also offers facilities to solve the biggest challenge faced by collaborating processes: how to pass around data.

### Simple demonstration of the map method of ThreadPoolExecutor

In [245]:
from time import sleep, strftime
from concurrent import futures

def display(*args):   
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)


def loiter(n):   
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10   


def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)   
    results = executor.map(loiter, range(5))   
    display('results:', results)  # .
    display('Waiting for individual results:')
    for i, result in enumerate(results):   
        display('result {}: {}'.format(i, result))


main()


[13:42:51] Script starting.
[13:42:51][13:42:51] 	loiter(1): doing nothing for 1s...
 loiter(0): doing nothing for 0s...
[13:42:51] loiter(0): done.
[13:42:51] 		loiter(2): doing nothing for 2s...
[13:42:51][13:42:51] results: <generator object Executor.map.<locals>.result_iterator at 0x7f9fcb2d1200>
[13:42:51] Waiting for individual results:
[13:42:51] result 0: 0
 			loiter(3): doing nothing for 3s...
[13:42:52] 	loiter(1): done.
[13:42:52] 				loiter(4): doing nothing for 4s...
[13:42:52] result 1: 10
[13:42:53] 		loiter(2): done.
[13:42:53] result 2: 20
[13:42:54] 			loiter(3): done.
[13:42:54] result 3: 30
[13:42:56] 				loiter(4): done.
[13:42:56] result 4: 40
