Concurrent Web Downloads Examples

In [5]:
import time
from pathlib import Path
from typing import Callable

import httpx

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'https://www.fluentpython.com/data/flags'
DEST_DIR = Path('downloaded')

def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1, allow_redirects=True)
    resp.raise_for_status()
    return resp.content

def download_many(cc_list: list[str]) -> int:
    for cc in sorted(cc_list):
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)
    return len(cc_list)

def main(downloader: Callable[[list[str]], int]) -> None:
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')
    
main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 downloads in 5.20s


In [6]:
from concurrent import futures

import time
from pathlib import Path
from typing import Callable

import httpx

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'https://www.fluentpython.com/data/flags'
DEST_DIR = Path('downloaded')

def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1, allow_redirects=True)
    resp.raise_for_status()
    return resp.content

def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

def main(downloader: Callable[[list[str]], int]) -> None:
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')
    
main(download_many)

FR BD NG IN DE CN PH IR EG BR RU PK JP ID MX ET TR US CD VN 
20 downloads in 0.77s


In [9]:
from concurrent import futures

import time
from pathlib import Path
from typing import Callable

import httpx

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'https://www.fluentpython.com/data/flags'
DEST_DIR = Path('downloaded')

def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1, allow_redirects=True)
    resp.raise_for_status()
    return resp.content

def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do: list[futures.Future] = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            print(f'Scheduled for {cc}: {future}')
        
        for count, future in enumerate(futures.as_completed(to_do), 1):
            res: str = future.result()
            print(f'{future} result: {res!r}')
            
    return len(list(res))

def main(downloader: Callable[[list[str]], int]) -> None:
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')
    
main(download_many)

Scheduled for BR: <Future at 0x7c2423568470 state=running>
Scheduled for CN: <Future at 0x7c2423569610 state=running>
Scheduled for ID: <Future at 0x7c24235615e0 state=running>
Scheduled for IN: <Future at 0x7c2422bd2810 state=pending>
Scheduled for US: <Future at 0x7c2422bd3530 state=pending>
CN <Future at 0x7c2423569610 state=finished returned str> result: 'CN'
ID <Future at 0x7c24235615e0 state=finished returned str> result: 'ID'
IN <Future at 0x7c2422bd2810 state=finished returned str> result: 'IN'
BR <Future at 0x7c2423568470 state=finished returned str> result: 'BR'
US <Future at 0x7c2422bd3530 state=finished returned str> result: 'US'

2 downloads in 0.41s


Concurrent Executors Examples

In [13]:
import sys
import random
import math
from concurrent import futures
from time import perf_counter
from typing import NamedTuple

NUMBERS = random.sample(range(1, (10 ** 16) - 1), 20)
NUMBERS.sort()

def is_prime(n: int) -> bool:
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    
    root = math.isqrt(n)
    for i in range(3, root + 1, 2):
        if n % i == 0:
            return False
    return True

class PrimeResult(NamedTuple):
    n: int
    flag: bool
    elapsed: float

def check(n: int) -> PrimeResult:
    t0 = perf_counter()
    res = is_prime(n)
    return PrimeResult(n, res, perf_counter() - t0)

def main() -> None:
    if len(sys.argv) < 3:
        workers = None
    else:
        workers = int(sys.argv[1])
    
    executor = futures.ProcessPoolExecutor(workers)
    actual_workers = executor._max_workers # type: ignore
    
    print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')
    t0 = perf_counter()
    numbers = sorted(NUMBERS, reverse=True)
    with executor:
        for n, prime, elapsed in executor.map(check, numbers):
            label = 'P' if prime else ' '
            print(f'{n:16}  {label} {elapsed:9.6f}s')

    time = perf_counter() - t0
    print(f'Total time: {time:.2f}s')

main()

Checking 20 numbers with 12 processes:
9932991660630535     0.000024s
9883544377252488     0.000007s
9802528211098268     0.000010s
8637625169726522     0.000007s
8072291908794933     0.000023s
7712535165216916     0.000010s
7641876732334794     0.000009s
7549040678280958     0.000009s
7419399153565079     0.000020s
6657809980027327     0.000022s
4102433297313846     0.000001s
3726457481763849     0.000020s
3485584330396927     0.000008s
2810743094773289     0.000014s
2691568994980355     0.000004s
2435158201366758     0.000002s
2397438820547955     0.000019s
1116892557355165     0.000013s
 894572718345947     0.000309s
 466882224938074     0.000002s
Total time: 0.06s


Experimenting with Executor.map Examples

In [14]:
from time import sleep, strftime
from concurrent import futures

def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)

def loiter(n):
    msg = '{}loiter({}): doing nothing for ()s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10

def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results:', results)
    display('Waiting for individual results:')
    for i, result in enumerate(results):
        display(f'result {i}: {result}')

main()

[06:57:55] Script starting.
[06:57:55] loiter(0): doing nothing for ()s...
[06:57:55] 	loiter(1): doing nothing for ()s...
[06:57:55] 		loiter(2): doing nothing for ()s...
[06:57:55] loiter(0): done.
[06:57:55] results: <generator object Executor.map.<locals>.result_iterator at 0x7c24221f9b70>
[06:57:55] Waiting for individual results:
[06:57:55] result 0: 0
[06:57:55] 			loiter(3): doing nothing for ()s...
[06:57:56] 	loiter(1): done.
[06:57:56] 				loiter(4): doing nothing for ()s...
[06:57:56] result 1: 10
[06:57:57] 		loiter(2): done.
[06:57:57] result 2: 20
[06:57:58] 			loiter(3): done.
[06:57:58] result 3: 30
[06:58:00] 				loiter(4): done.
[06:58:00] result 4: 40
