In [4]:
from concurrent import futures

In [5]:
import time
from pathlib import Path
from typing import Callable

In [6]:
import httpx

In [7]:
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split(' ')

In [8]:
BASE_URL = 'https://www.fluentpython.com/data/flags'

In [9]:
DESC_DIR = Path('downloaded')

In [16]:
def save_flag(img: bytes, filename: str) -> None:
    (DESC_DIR / filename).write_bytes(img)

In [11]:
def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1, follow_redirects=True, verify=False)
    resp.raise_for_status()
    return resp.content

In [17]:
def download_many(cc_list: list[str]) -> int:
    for cc in sorted(cc_list):
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)
    return len(cc_list)

In [13]:
def main(downloader: Callable[[list[str]], int]) -> None:
    DESC_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = download_many(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

In [14]:
# main(download_many)

## ThreadPoolExecutor

In [15]:
# redefine download_many

In [39]:
def download_one(cc: str) -> str:
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

In [55]:
def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor() as executor:
        # `download_one' will be called concurrently by executor.map.
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

In [56]:
main(download_many)

FR BDNG  ID PKDE  IR BRIN  CN PH ET EGCD  JP MX RU TRUS  VN 
20 downloads in 0.32s


In [64]:
# download_many take #2

In [53]:
def download_many(cc_list: list[str]) -> int:
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do: list[futures.Future] = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            print(f'Scheduled for {cc}: {future}')
            
        # None of the calls in `download_many' is blocking
        # as as_completed yields futures one by one,
        # in the order they finishe.
        for count, future in enumerate(futures.as_completed(to_do), 1):
            res: str = future.result()
            print(f'{future} result: {res!r}')
    
    return count

In [54]:
main(download_many)

Scheduled for BR: <Future at 0x10b3b6590 state=running>
Scheduled for CN: <Future at 0x10addbd90 state=running>
Scheduled for ID: <Future at 0x10ba35e40 state=running>
Scheduled for IN: <Future at 0x10ae58400 state=pending>
Scheduled for US: <Future at 0x10ba365c0 state=pending>
ID <Future at 0x10ba35e40 state=finished returned str> result: 'ID'
BR <Future at 0x10b3b6590 state=finished returned str> result: 'BR'
CN <Future at 0x10addbd90 state=finished returned str> result: 'CN'
IN <Future at 0x10ae58400 state=finished returned str> result: 'IN'
US <Future at 0x10ba365c0 state=finished returned str> result: 'US'

5 downloads in 0.21s


In [52]:
# Note:
# US IN will always be the last two.
# If we set max_worker to be 1, it will run sequencially.

## ProcessPoolExecutor

In [63]:
# redefine download_many

In [61]:
def download_many(cc_list: list[str]) -> int:
    with futures.ProcessPoolExecutor() as executor:
        # `download_one' will be called concurrently by executor.map.
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

In [62]:
# main(download_many)