Now we are going to look at how python can be used for concurrent programming. Even though Python is somewhat limited by GIL we have some basic level of cuncurrency support. 

Currency is supported in python via the following concepts
1. `threads`
2. `asyncio`
3. `process`


We'll look at an example were downloading items from the internet can be made faster with threads.

In [4]:
import os
import time
import sys
from pathlib import Path

import requests

In [11]:
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()

In [10]:
BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = Path('download/')
if not DEST_DIR.exists():
    DEST_DIR.mkdir()
    print('download dir created!')

In [26]:
def save_flag(img, filename):
    path = DEST_DIR/filename
    with open(path, 'wb') as fp:
        fp.write(img)
        
def get_flag(cc):
    url = f"{BASE_URL}/{cc.lower()}/{cc.lower()}.gif"
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ')
    sys.stdout.flush()
    
def download_many(cc_list):
    for cc in sorted(cc_list):
        img = get_flag(cc)
        show(cc)
        save_flag(img, cc.lower()+'.gif')
        
    return len(cc_list)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    

In [27]:
main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 4.61s


### Downloading with concurrent.futures

The main features of concurrent.futures are `ThreadPoolExecutor` and `ProcessPoolExecutor`. These abstract the inner workings of threads so we can work with a simple api. 

In [22]:
from concurrent import futures
MAX_WORKERS = 20

def download_one(cc):
    img = get_flag(cc)
    show(cc)
    save_flag(img, cc.lower()+'.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))
        
    return len(list(res))

In [25]:
main(download_many)

BR BD EG FR ET CD IN DE ID MX NGRU  IRCN  PHJPTR   US VN PK 
20 flags downloaded in 0.59s


As you see there is a major speedup simply by using concurrency.

### Where are the Futures?

Futures encapsulate pending operations so that they can be put in queues, their state of completion can be queried, and their results (or exceptions) can be retrieved when available.

They are similar to `promise` object in javascript. 

Future instances are used in both concurrent.futures.Future and asyncio.Future. Both of the support `.done()`, `.add_done_callback()` and `.result()`. 

Several functions in both libraries return futures; others use them in their implementations but most of these are hidden from the user.


To get a practical look into futures we'll rewrite the example above with futures.

In [30]:
def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = f'Scheduled for {cc}: {future}'
            print(msg)
            
        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            print(f'{future} result :{res}')
            results.append(res)
            
        return len(results)

In [31]:
main(download_many)

Scheduled for BR: <Future at 0x7f68251fced0 state=running>
Scheduled for CN: <Future at 0x7f6824bb28d0 state=running>
Scheduled for ID: <Future at 0x7f6824b0c390 state=running>
Scheduled for IN: <Future at 0x7f682517ced0 state=pending>
Scheduled for US: <Future at 0x7f682517c690 state=pending>
CN BR <Future at 0x7f6824bb28d0 state=finished returned str> result :CN
<Future at 0x7f68251fced0 state=finished returned str> result :BRID 
<Future at 0x7f6824b0c390 state=finished returned str> result :ID
US <Future at 0x7f682517c690 state=finished returned str> result :US
IN <Future at 0x7f682517ced0 state=finished returned str> result :IN

5 flags downloaded in 0.58s


Now stictly speaking we are still downloading over a single process due to the limitation from GIL. But we still get a good boost due to the fact that the is a I/O bound operation. 

For I/O bound operations the python interpreter frees the GIL and that mean other threads can execute. Even functions like `time.sleep()` releases the GIL.

However if you want to leverage all the CPU cores you can use `ProcessPoolExecutor`. 

In [34]:
def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
    
    return len(list(res))

In [35]:
main(download_many)

BDBRCNCD    EGDEET   FR IDIN  JP IR MXNG  PK PH RU TRUS  VN 
20 flags downloaded in 1.85s


As you noticed this is not as effective as `threads` mainly because in my system there are only 4 workers. For other CPU intensive tasks. For pure CPU intensive tasks however you should be using `pypy` instead which gives much better performance.

## Experimenting with Executor.map
simplest way to run serveral callables is using the map function. Here we'll look into it

In [8]:
from time import sleep, strftime
from concurrent import futures

def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)
    
def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n*10

def main():
    display('Script starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(10))
    display('resutls', results)
    display('Waiting for individual results')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))
        
    

In [9]:
main()

[13:26:03] Script starting
[13:26:03] loiter(0): doing nothing for 0s
[13:26:03] loiter(0): done.
[13:26:03][13:26:03] 	loiter(1): doing nothing for 1s
[13:26:03] resutls <generator object Executor.map.<locals>.result_iterator at 0x7f9dac5e25d0>
[13:26:03] Waiting for individual results
[13:26:03] result 0: 0
[13:26:03] 			loiter(3): doing nothing for 3s
 		loiter(2): doing nothing for 2s
[13:26:04] 	loiter(1): done.
[13:26:04] 				loiter(4): doing nothing for 4s
[13:26:04] result 1: 10
[13:26:05] 		loiter(2): done.
[13:26:05] 					loiter(5): doing nothing for 5s
[13:26:05] result 2: 20
[13:26:06] 			loiter(3): done.
[13:26:06] 						loiter(6): doing nothing for 6s
[13:26:06] result 3: 30
[13:26:08] 				loiter(4): done.
[13:26:08] 							loiter(7): doing nothing for 7s
[13:26:08] result 4: 40
[13:26:10] 					loiter(5): done.
[13:26:10] [13:26:10] result 5: 50
								loiter(8): doing nothing for 8s
[13:26:12] 						loiter(6): done.
[13:26:12] 									loiter(9): doing nothing for 9s
[1

The Executor.map function is easy to use but it has a feature that may or may not be
helpful, depending on your needs: it returns the results exactly in the same order as the
calls are started: if the first call takes 10s to produce a result, and the others take 1s each,
your code will block for 10s as it tries to retrieve the first result of the generator returned
by map . After that, you’ll get the remaining results without blocking because they will
be done. That’s OK when you must have all the results before proceeding, but often it’s
preferable to get the results as they are ready, regardless of the order they were submitted.

However if there is a situation were you need results as they are ready, regardless of the order they were submitted you would need a combination of `Executor.submit` method and `futures.as_completed` function.

### Improved Script for flag download.

Now lets try out an improved version of the download_flag script that has better error handling and a nice progress bar.

In [13]:
## flags common
from enum import Enum
HTTPStatus = Enum('status', 'not_found')

from collections import Namedtuple
Results = Na

In [11]:
import requests


def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status != 200:
        resp.raise_for_status()
        
    return resp.content

def download_one(cc, base_url, verbose=False):
    try: 
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:
            raise
    else:
        save_flag(image, cc.lower()+'.gif')
        status = HTTPStatus.ok
        msg = 'OK'
    
    if verbose:
        print(cc, msg)
        
    return Result(status, cc)

In [16]:
HTTPStatus

<enum 'status'>