# Concurrent Executors

In [4]:
import time
from pathlib import Path
from typing import Callable

import httpx


POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()  

BASE_URL = 'https://www.fluentpython.com/data/flags'  
DEST_DIR = Path('downloaded')                        


def save_flag(img: bytes, filename: str) -> None:    
    (DEST_DIR / filename).write_bytes(img)


def get_flag(cc: str) -> bytes: 
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1,     
                     follow_redirects=True) 
    resp.raise_for_status() 
    return resp.content
    

## A Sequential Download

In [5]:
def download_many(cc_list: list[str]) -> int:  
    for cc in sorted(cc_list):                
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)       
    return len(cc_list)


def main(downloader: Callable[[list[str]], int]) -> None: 
    DEST_DIR.mkdir(exist_ok=True)                        
    t0 = time.perf_counter()                              
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')
    

In [7]:
main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 downloads in 36.17s


## Downloading with `concurrent.futures`

In [8]:
from concurrent import futures


def download_one(cc:str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc


def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))
    

In [9]:
main(download_many)

BD JP CN CD IN ID ET EG NG PK TR RU PH FR IR DE MX VN US BR 
20 downloads in 3.47s


In [10]:
def download_many(cc_list: list[str]) -> int:
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        todo: list[futures.Future] = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            todo.append(future)
            print(f'Scheduled for {cc}: {future}')
        
        for count, future in enumerate(futures.as_completed(todo), 1):
            res: str = future.result()
            print(f'{future}\tresult: {res!r}')
        return count


In [11]:
main(download_many)

Scheduled for BR: <Future at 0x1996338a230 state=running>
Scheduled for CN: <Future at 0x19965121a80 state=running>
Scheduled for ID: <Future at 0x1996531ce20 state=running>
Scheduled for IN: <Future at 0x199652b9420 state=pending>
Scheduled for US: <Future at 0x199652b8fa0 state=pending>
ID <Future at 0x1996531ce20 state=finished returned str>	result: 'ID'
BR <Future at 0x1996338a230 state=finished returned str>	result: 'BR'
CN <Future at 0x19965121a80 state=finished returned str>	result: 'CN'
US <Future at 0x199652b8fa0 state=finished returned str>	result: 'US'
IN <Future at 0x199652b9420 state=finished returned str>	result: 'IN'

5 downloads in 2.55s


## Experimenting with `Executor.map`

In [19]:
from time import sleep, strftime
from concurrent import futures


def display(*args):
    print(strftime('[%H:%M:%S] '), end='')
    print(*args)


def loiter(n):
    msg = '{}loiter({}): doing nothing for {}...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done. '
    display(msg.format('\t'*n, n))
    return n * 10


def main():
    display('Script starting. ')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('Waiting for individual results: ')
    for i, result in enumerate(results):
        display(f'result {i}: {result}')
    display('Results:', results)


if __name__ == "__main__":
    main()

[09:26:09] Script starting. 
[09:26:09] loiter(0): doing nothing for 0...
[09:26:09] loiter(0): done. 
[09:26:09] 	loiter(1): doing nothing for 1...
[09:26:09] 		loiter(2): doing nothing for 2...
[09:26:09] 			loiter(3): doing nothing for 3...
[09:26:09] Waiting for individual results: 
[09:26:09] result 0: 0
[09:26:10] 	loiter(1): done. 
[09:26:10] 				loiter(4): doing nothing for 4...
[09:26:10] result 1: 10
[09:26:11] 		loiter(2): done. 
[09:26:11] result 2: 20
[09:26:12] 			loiter(3): done. 
[09:26:12] result 3: 30
[09:26:14] 				loiter(4): done. 
[09:26:14] result 4: 40
[09:26:14] Results: <generator object Executor.map.<locals>.result_iterator at 0x000001996541BA00>


Integrating `tqdm` provides an opportunity to look deeper into how the concurrent scripts actuallu work, by forcing us to use the `futures.as_completed` and the `asyncio.as_completed` functions so that `tqdm` can display progress as each future is completed. 

In [27]:
from tqdm import tqdm
for i in tqdm(range(10)):
    time.sleep(.1)

100%|██████████| 10/10 [00:01<00:00,  9.05it/s]
