References:
- https://docs.python.org/3/library/concurrent.futures.html
- Fluent Python by Luciano Ramalho. Chapter 17: Concurrency with Futures

# Example:  Web Ddownloads in 3 Styles
## A Sequential Download Script

In [1]:
import os
import time
import sys

import requests

In [2]:
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

In [3]:
BASE_URL = 'http://flupy.org/data/flags'

In [4]:
DEST_DIR = 'downloads/'

In [5]:
def save_flag(img, filename):  # <5>
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)

In [6]:
def get_flag(cc):  # <6>
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

In [7]:
def show(text):  # <7>
    print(text, end=' ')
    sys.stdout.flush()

In [8]:
def download_many(cc_list):  # <8>
    for cc in sorted(cc_list):  # <9>
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')

    return len(cc_list)

In [9]:
def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

In [10]:
main()

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 23.04s


## Downloading with `concurrent.futures`

In [11]:
from concurrent import futures

In [12]:
MAX_WORKERS = 20

In [13]:
def download_one(cc): 
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

In [14]:
def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))  # set the min number of max workers
    with futures.ThreadPoolExecutor(workers) as executor:  # instantiate
        res = executor.map(download_one, sorted(cc_list))  # returns a seperate generator for each thread

    return len(list(res)) # return the number of results

In [15]:
main()

RU NG BD DE FR CN ID PK US TR CD VN JP IRMXIN   ET BR EG PH 
20 flags downloaded in 1.77s


## Where are the futures

Futures encapsulate pending operations so that they can be put in queues.

Developers should not create futures, they are meant to be instantiated exclusively by thee concurrency framework.

In [16]:
def download_many(cc_list):
    cc_list = cc_list[:5]  # top 5 most populous countries
    with futures.ThreadPoolExecutor(max_workers=3) as executor: 
        to_do = []
        for cc in sorted(cc_list):  # to show results arrive out of alphabetical order
            future = executor.submit(download_one, cc)  # schedules the callable and returns a future
            to_do.append(future)  # store each future
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))  # display country code and respoective future

        results = []
        for future in futures.as_completed(to_do):  # yields futures as they are completed
            res = future.result()  # get the result 
            msg = '{} result: {!r}'
            print(msg.format(future, res)) # display
            results.append(res)

In [17]:
main()

Scheduled for BR: <Future at 0x103aab8d0 state=running>
Scheduled for CN: <Future at 0x103acbb10 state=running>
Scheduled for ID: <Future at 0x103ae3290 state=running>
Scheduled for IN: <Future at 0x103aab910 state=pending>
Scheduled for US: <Future at 0x103ad4850 state=pending>
BRCN  <Future at 0x103acbb10 state=finished returned str> result: 'CN'
<Future at 0x103aab8d0 state=finished returned str> result: 'BR'
ID <Future at 0x103ae3290 state=finished returned str> result: 'ID'
IN <Future at 0x103aab910 state=finished returned str> result: 'IN'
US <Future at 0x103ad4850 state=finished returned str> result: 'US'

None flags downloaded in 1.94s


#  Blocking I/O  and the  GIL
The CPython interpreter is not thread-safe internally, so it has a Global Interpreter Lock.

In [18]:
def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor: 
        res = executor.map(download_one, sorted(cc_list)) 

    return len(list(res)) 

In [19]:
main()

BD BR CD CN DE EG ET FR IR ID JP IN MX NG PK PH US VN TR RU 
20 flags downloaded in 7.01s


# Experimenting with `Executor.map`

In [20]:
from time import sleep, strftime
from concurrent import futures

In [21]:
def display(*args):  # prints the arguments
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)


In [22]:
def loiter(n):  
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10  

In [23]:
def demo_executor_map():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)  # Creates ThreadPoolExecutor
    results = executor.map(loiter, range(5))  # Submit 5 tasks with only 3 threads, non blocking
    display('results:', results)  
    display('Waiting for individual results:')
    for i, result in enumerate(results):  # the enumerator implicitly invoke next(results),  which  will invoke  _f.result() The result method will block until the future is done.
        display('result {}: {}'.format(i, result))

In [24]:
demo_executor_map()

[06:11:20] Script starting.
[06:11:20] [06:11:20]loiter(0): doing nothing for 0s... 
[06:11:20]	loiter(1): doing nothing for 1s... loiter(0): done.

[06:11:20] 		loiter(2): doing nothing for 2s...
[06:11:20][06:11:20] results: <generator object Executor.map.<locals>.result_iterator at 0x103b2f050>
[06:11:20] Waiting for individual results:
[06:11:20] 			loiter(3): doing nothing for 3s...
 result 0: 0
[06:11:21] 	loiter(1): done.
[06:11:21] 				loiter(4): doing nothing for 4s...
[06:11:21] result 1: 10
[06:11:22] 		loiter(2): done.
[06:11:22] result 2: 20
[06:11:23] 			loiter(3): done.
[06:11:23] result 3: 30
[06:11:25] 				loiter(4): done.
[06:11:25] result 4: 40


# Downloads with Progress Display and Error Handling

https://github.com/noamraph/tqdm/blob/master/README.md

