## Concurrency with Futures
This chapter focuses on the concurrent.futures library introduced in Python 3.2.

## A Sequential Download Script

In [6]:
import os
import time
import sys
import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
    return len(cc_list)

def create_downloads_folder():
    if not os.path.isdir("downloads"):
        os.mkdir("downloads")

def main(download_many):
    create_downloads_folder()
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

if __name__ == '__main__':
    main(download_many)


BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 18.58s


This serves as a baseline for comparing the other scripts. 

## Downloading with concurrent.futures
The main features of the concurrent.futures package are the ThreadPoolExecutor and ProcessPoolExecutor classes.

In [8]:
from concurrent import futures

MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

if __name__ == '__main__':
    main(download_many)

BD ID DEEGFR CN JPRUBR TR  MX INVNNG ET    IRPKUSCD PH      
20 flags downloaded in 1.42s


That's a substantial improvement in download speed. The easiest way to implement the downloads concurrently, using the ThreadPoolExecutor.map method

## Where Are the Futures?
As of Python 3.4, there are two classes named Future in the standard library: concurrent.futures.Future and asyncio.Future. They serve the same purpose: an instance of either Future class represents a deferred computation that may or may not have
completed. This is similar to the Deferred class in Twisted, the Future class in Tornado, and Promise objects in various JavaScript libraries.

Strictly speaking, none of the concurrent scripts we tested so far can perform downloads in parallel. The concurrent.futures examples are limited by the GIL, and the flags_asyncio.py is single-threaded.

## Blocking I/O and the GIL
The CPython interpreter is not thread-safe internally, so it has a Global Interpreter Lock (GIL), which allows only one thread at a time to execute Python bytecodes. That’s why a single Python process usually cannot use multiple CPU cores at the same time.

However, all standard library functions that perform blocking I/O release the GIL when waiting for a result from the OS. This means Python programs that are I/O bound can benefit from using threads at the Python level: while one Python thread is waiting for a response from the network, the blocked I/O function releases the GIL so another thread can run.

That’s why David Beazley says: 'Python threads are great at doing nothing.'

## Launching Processes with concurrent.futures
Both ProcessPoolExecutor and ThreadPoolExecutor implement the generic Executor interface, so it’s very easy to switch from a thread-based to a process-based solution using concurrent.futures.

In [12]:
import math
from concurrent import futures

MAX_WORKERS = 20

input_list = range(20)


def run_thread_pool(input_list):
    workers = min(MAX_WORKERS, len(input_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(math.sqrt, input_list)
    return res


def run_process_pool(input_list):
    with futures.ProcessPoolExecutor() as executor:
        res = executor.map(math.sqrt, input_list)
    return res


In [13]:
%timeit run_thread_pool(input_list)

18 ms ± 3.39 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [14]:
%timeit run_process_pool(input_list)

547 ms ± 136 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


There is overhead is using different processes. The value of ProcessPoolExecutor is in CPU-intensive jobs. workers is an optional argument in ProcessPoolExecutor, and most of the time we don’t use it—the default is the number of CPUs returned by os.cpu_count(). 

## Experimenting with Executor.map


In [15]:
from time import sleep, strftime
from concurrent import futures

def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)

def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10

def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results:', results)
    display('Waiting for individual results:')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))

if __name__ == "__main__":
    main()


[06:03:15] Script starting.
[06:03:15] loiter(0): doing nothing for 0s...
[06:03:15] loiter(0): done.
[06:03:15] 	loiter(1): doing nothing for 1s...
[06:03:15][06:03:15] 		loiter(2): doing nothing for 2s...
[06:03:15] results: 			loiter(3): doing nothing for 3s...
 <generator object Executor.map.<locals>.result_iterator at 0x0000000004EBEC50>
[06:03:15] Waiting for individual results:
[06:03:15] result 0: 0
[06:03:16] 	loiter(1): done.
[06:03:16][06:03:16] 				loiter(4): doing nothing for 4s...
 result 1: 10
[06:03:17] 		loiter(2): done.
[06:03:17] result 2: 20
[06:03:18] 			loiter(3): done.
[06:03:18] result 3: 30
[06:03:20] 				loiter(4): done.
[06:03:20] result 4: 40


In the next section, we will resume the flag download examples with new requirements that will force us to iterate over the results of futures.as_completed instead of using executor.map.

## Downloads with Progress Display

In [17]:
! pip install tqdm

Collecting tqdm
  Downloading https://files.pythonhosted.org/packages/91/55/8cb23a97301b177e9c8e3226dba45bb454411de2cbd25746763267f226c2/tqdm-4.28.1-py2.py3-none-any.whl (45kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.28.1


You are using pip version 18.0, however version 18.1 is available.
You should consider upgrading via the 'python -m pip install --upgrade pip' command.


In [7]:
import time
from tqdm import tqdm

for i in tqdm(range(100)):
    time.sleep(0.01)

100%|████████████████████████████████████████| 100/100 [00:01<00:00, 86.50it/s]


From the docs of tqdm: Instantly make your loops show a smart progress meter - just wrap any iterable with tqdm(iterable), and you’re done!

## Error Handling in the flags2 Examples

In [8]:
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()
    return resp.content

def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:
            raise
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
        if verbose:
            print(cc, msg)
        return Result(status, cc) 


***