# Chapter 20: Concurrent Executors

## A Sequential Download Script

In [None]:
import time
from pathlib import Path
from typing import Callable


import httpx  # <1>

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()  # <2>

BASE_URL = 'https://www.fluentpython.com/data/flags'  # <3>
DEST_DIR = Path('downloaded')                         # <4>

def save_flag(img: bytes, filename: str) -> None:     # <5>
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:  # <6>
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1,       # <7>
                    follow_redirects=True)   # <8>
    resp.raise_for_status()  # <9>
    return resp.content


def download_many(cc_list: list[str]) -> int:  # <10>
    for cc in sorted(cc_list):                 # <11>
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)         # <12>
    return len(cc_list)

def main(downloader: Callable[[list[str]], int]) -> None:  # <13>
    DEST_DIR.mkdir(exist_ok=True)                          # <14>
    t0 = time.perf_counter()                               # <15>
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

if __name__ == '__main__':
    main(download_many)     # <16>

1. Import the `httpx` library. It's not part of the standard library, so by convention the import goes after the standard library imports and a blank line.

2. List of the ISO 3166 country codes for the most populous countries in order of descending population. The list is from [Wikipedia](https://en.wikipedia.org/wiki/List_of_countries_and_dependencies_by_population).

3. The directory with the flag images. 

4. Local directory where the images are saved. 

5. Save the `img` bytes to `filename` in the `DEST_DIR` directory.

6. Given a country code, build the URL and download the image, returning the binary contents of the response.

7. It's good practice to add a sensible timeout to network operations, to avoid blocking for several minutes for no good reason.

8. By default, HTTPX does not follow redirects.

9. There's no error handling in this script, but this method raises an exception if the HTTP status is not in the 2XX range--highly recommended to avoid silent failures.

10. `download_many` is the key function to compare with the concurrent implementations.

11. Loop over the list of country codes in alphabetical order, to make it easy to see that the ordering is preserved in the output; return the number of country codes downloaded.

12. Display one country code at a time in the same line so we can see progress as each download happens. The end=' ' argument replaces the usual line break at the end of each line with a space, so the next print call will continue on the same line. The `flush=True` argument is needed to ensure that the output is displayed immediately, by default, Python is line buffered, meaning that Python only displays printed characters when it sees a line break or when the buffer is full.

13. `main` must be called with the function that will make the downloads; that way, we can use `main` as library function with other implementations of `download_many` in the future.

14. Create `DEST_DIR` if it doesn't exist, don't raise an exception if it already exists.

15. Record and report the elapsed time after running the `downloader` function.

16. Call `main` with `download_many` to test the script.

## Downloading with concurrent.futures

In [None]:
# flags_threadpool.py: threaded download script using futures.ThreadPoolExecutor

from concurrent import futures

def download_one(cc: str):#2
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor() as executor: #3
        res = executor.map(download_one, sorted(cc_list)) #4
    return len(list(res)) #5

if __name__ == '__main__':
    main(download_many)


1. Reuse some functions from the `flags` module. (No need in jupyter notebook)

2. Function to download a single image, this is what each worker will execute.

3. Instantiate a `ThreadPoolExecutor` as a context manager; the `executor.__exit__` method will call `executor.shutdown(wait=True)`, which will block until all threads are done.

4. The `map` method is similar to the `map` built-in, except the `download_one` function will be called concurrently from multiple threads; it returns a generator that you can iterate to retrieve the value returned by each function call--in this case, each call to `download_one` will return a country code.

5. Return the number of results obtained. If any of the threaded calls raises an exception, that exception is raised here when the implicit `next()` call inside the `list` constructor tries to retrieve the corresponding value from the generator.

6. Call the main function from the `flags` module, passing the concurrent version of `download_many`.

The first and most important argument of the `ThreadPoolExecutor` is `max_workers`, setting the maximum number of worker threads to executed. When `max_workers` is `None` or not given, its value using the following expression--since Python 3.8:

```python
max_workers = min(32, os.cpu_count() + 4)
```

> This default value preserves at least 5 threads for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. It avoids using very large resources implicitly on many-core machines.


## Where Are the Futures?

Futures are core components of `concurrent.futures` and of `asyncio`. Since Python 3.4, there are `concurrent.futures.Future` and `asyncio.Future`. They server the same purpose: to represent a deferred computation that may or may not have completed yet. The difference is that `concurrent.futures.Future` is designed to be used with `concurrent.futures.Executor` objects, while `asyncio.Future` is intended for use with `asyncio` event loops.

You should not create them: they are meant to be instantiated exclusively by the concurrency framework. Here is why: a `Future` represents something that will eventually run, therefore it must be scheduled to run, and that's the job of the framework. For example, the `Executor.submit()` method takes a callable, schedules it to run, and returns a `Future`.

Futures and threads serve different purposes, even though they both involve representing deferred computations.

1. Conceptual Difference:
   - Futures: Futures are high-level abstractions that represent the result of an asynchronous operation or a task that may not have completed yet. They allow you to work with the result of an asynchronous computation when it becomes available.
   - Threads: Threads, on the other hand, are lower-level constructs that represent a separate flow of execution within a process. They enable concurrent execution of multiple tasks or operations.

2. Concurrency vs. Asynchrony:
   - Futures: Futures are primarily used for asynchronous programming, where you can initiate multiple tasks concurrently and continue with other operations while waiting for the results. Futures provide a way to synchronize and retrieve the results of these asynchronous tasks.
   - Threads: Threads, on the other hand, enable concurrent execution by allowing multiple threads of execution to run simultaneously. They are typically used for parallelism or when you have computationally intensive tasks that can benefit from running concurrently on multiple CPU cores.

3. Resource Consumption:
   - Futures: Working with futures generally involves less resource consumption compared to threads. Futures are typically lightweight and can be managed more efficiently, making them suitable for handling a large number of concurrent tasks without consuming excessive system resources.
   - Threads: Threads, being lower-level constructs, tend to consume more system resources, such as memory and CPU time. Creating and managing threads can have overhead, and having too many threads can lead to decreased performance or even resource exhaustion.

In summary, futures and threads have different purposes and use cases. Futures are designed for managing asynchronous computations and retrieving results, while threads enable concurrent execution of multiple tasks. Understanding these differences can help you choose the appropriate approach based on your specific requirements and constraints.

In [None]:
# flags_threadpool_futures.py: replacing `executor.map` with `executor.submit` 
# and `futures.as_completed` in the `download_many` function

def download_many(cc_list: list[str]) -> int:
    cc_list = cc_list[:5] #1
    with futures.ThreadPoolExecutor(max_workers=3) as executor: #2
        to_do: list[futures.Future] = []
        for cc in sorted(cc_list): #3
            future = executor.submit(download_one, cc) #4
            to_do.append(future) #5
            print(f'Scheduled for {cc}: {future}') #6            

        for count, future in enumerate(futures.as_completed(to_do), 1): #7
            res: str = future.result() #8
            print(f'{future} result: {res!r}') #9
    
    return count

if __name__ == '__main__':
    main(download_many)

Scheduled for BR: <Future at 0x1ff9009bc80 state=running>
Scheduled for CN: <Future at 0x1ff9009ad50 state=running>
Scheduled for ID: <Future at 0x1ff9118e3f0 state=running>
Scheduled for IN: <Future at 0x1ff91216630 state=pending>
Scheduled for US: <Future at 0x1ff91215a30 state=pending>
ID ThreadPoolExecutor-6_2 <Future at 0x1ff9118e3f0 state=finished returned str> result: 'ID'
CN ThreadPoolExecutor-6_1 <Future at 0x1ff9009ad50 state=finished returned str> result: 'CN'
BR ThreadPoolExecutor-6_0 <Future at 0x1ff9009bc80 state=finished returned str> result: 'BR'
IN ThreadPoolExecutor-6_2 <Future at 0x1ff91216630 state=finished returned str> result: 'IN'
US ThreadPoolExecutor-6_1 <Future at 0x1ff91215a30 state=finished returned str> result: 'US'

5 downloads in 7.69s


1. For this demonstration, use only the top five most populous countries.

2. Set `max_workers` to 3, so we can see pending futures in the output.

3. Iterate over country codes alphabetically, to make it clear that results will arrive out of order. And the output will be possible different every time you run the script.

4. `executor.submit` schedules the callable to be executed, and returns a `future` representing this pending operation.

5. Store each `future` so we can later retrieve them `as_completed`.

6. Display a message with the country code and the respective `future` object.

7. `as_completed` yields futures as they are completed.

8. Get the result of this future.

9. Display the future and this result.

```python
Scheduled for BR: <Future at 0x25fc79380e0 state=running> #1
Scheduled for CN: <Future at 0x25fc86f8470 state=running>
Scheduled for ID: <Future at 0x25fc76f6c60 state=running>
Scheduled for IN: <Future at 0x25fc7958410 state=pending> #2
Scheduled for US: <Future at 0x25fc7958fe0 state=pending>
CN <Future at 0x25fc86f8470 state=finished returned str> result: 'CN' #3
ID BR <Future at 0x25fc76f6c60 state=finished returned str> result: 'ID' #4
<Future at 0x25fc79380e0 state=finished returned str> result: 'BR'
IN <Future at 0x25fc7958410 state=finished returned str> result: 'IN'
US <Future at 0x25fc7958fe0 state=finished returned str> result: 'US'
```

1. The futures are scheduled in alphabetical order; the repr() of a future shows its
state: the first three are running , because there are three worker threads.

2. The last two futures are pending , waiting for worker threads.

3. The first CN here is the output of download_one in a worker thread; the rest of the
line is the output of download_many 

4. Here, two threads output codes before download_many in the main thread can
display the result of the first thread.

How can I print thread info here?

In [3]:
import threading
from concurrent import futures

def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    print(threading.current_thread().name, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do: list[futures.Future] = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            print(f'Scheduled for {cc}: {future}')            
        
        count = 0
        for count, future in enumerate(futures.as_completed(to_do), 1):  # Access thread name from tuple
            res: str = future.result()
            print(f'{future} result: {res!r}')
    
    return count

if __name__ == '__main__':
    main(download_many)

Scheduled for BR: <Future at 0x272435a5ff0 state=running>
Scheduled for CN: <Future at 0x2724355e5f0 state=running>
Scheduled for ID: <Future at 0x272465a3820 state=running>
Scheduled for IN: <Future at 0x272465dbb80 state=pending>
Scheduled for US: <Future at 0x272465dbb50 state=pending>
ID ThreadPoolExecutor-0_2 <Future at 0x272465a3820 state=finished returned str> result: 'ID'
IN ThreadPoolExecutor-0_2 <Future at 0x272465dbb80 state=finished returned str> result: 'IN'
CN ThreadPoolExecutor-0_1 <Future at 0x2724355e5f0 state=finished returned str> result: 'CN'
US ThreadPoolExecutor-0_2 <Future at 0x272465dbb50 state=finished returned str> result: 'US'
BR ThreadPoolExecutor-0_0 <Future at 0x272435a5ff0 state=finished returned str> result: 'BR'

5 downloads in 2.90s


## Launching Process with `concurrent.futures`

`futures.ProcessPoolExecutor()` doesn't work in Jupyter Notebook.

The `ProcessPoolExecutor` class is similar to `ThreadPoolExecutor`, but it uses processes instead of threads. The API is the same, but the underlying implementation is different.

In [None]:
import sys
from concurrent import futures #1
from time import perf_counter
from typing import NamedTuple

from primes import is_prime, NUMBERS

class PrimeResult(NamedTuple): #2
    number: int
    flag: bool
    elapsed: float
    
def check(n: int) -> PrimeResult:
    t0 = perf_counter()
    res = is_prime(n)
    elapsed = perf_counter() - t0
    return PrimeResult(n, res, elapsed)

def main(workers = None) -> None: #3
    executor = futures.ProcessPoolExecutor(workers) #4
    actual_workers = executor._max_workers #5
    
    print(f'Checking {len(NUMBERS)} numbers using {actual_workers} workers')
    
    t0 = perf_counter()
    
    numbers = sorted(NUMBERS)
    with executor:
        for number, prime, elapsed in executor.map(check, numbers):
            label = 'P' if prime else ' '
            print(f'{number:16}  {label} {elapsed:9.6f}s')
    time = perf_counter() - t0
    print(f'Total time: {time:.2f}s')
    

if __name__ == '__main__':
    main(10)  

1. No need to import `multiprocessing, SimpleQueue` etc.; `concurrent.futures` takes care of that.

2. The `PrimeResult` tuple and the `check` function are the same as we saw in `procs.py`, but we don't need queues and the `worker` function anymore.

3. Instead of deciding ourselves how many workers to use if no-argument constructor is called, we set `workers` to `None` and let `ProcessPoolExecutor` decide.

4. Here I build the `ProcessPoolExecutor` before the `with` block in 7, so that I can display the actual number of workers used.

5. `_max_workers` is an undocumented instance attribute of `ProcessPoolExecutor` that holds the number of worker processes.

6. Sort the numbers to be checked in descending order. This will expose a difference in the behavior of `proc_pool.py` when compared to `procs.py`.

7. Use the `executor` as a context manager. The `executor.__exit__` method calls `executor.shutdown(wait=True)`, which will block until all processes are done.

8. The `executor.map` call returns the `PrimeResult` instances returned by `check` in the same order as the `numbers` arguments.

```bash
Checking 20 numbers using 10 workers
9999999999999999     0.000004s  #1
9999999999999917  P  4.763032s  #2
7777777777777777     0.000004s  #3
7777777777777753  P  3.443294s
7777777536340681     3.446641s
6666667141414921     6.048437s
6666666666666719  P  5.974608s
6666666666666666     0.000001s
5555555555555555     0.000004s
5555555555555503  P  5.439129s
5555553133149889     4.971766s
4444444488888889     5.156169s
4444444444444444     0.000001s
4444444444444423  P  5.367763s
3333335652092209     5.046108s
3333333333333333     0.000005s
3333333333333301  P  2.220393s
 299593572317531  P  0.664979s
 142702110479723  P  0.456387s
               2  P  0.000001s
Total time: 6.30s
```

1. This line appears very quickly.

2. This line takes more than 4.6s to show up.

3. All the remaining lines appear almost immediately.

## Experimenting with Executor.map

In [11]:
# Example 20-8. demo_executor_map.py: Simple demonstration
# of the ThreadPoolExecutor
from time import sleep, strftime
from concurrent import futures

def display(*args): #1
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)
    
    
def loiter(n): #2
    msg = '{}loiter({}):doing nothing for {}s'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10 #3
    
def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3) #4
    results = executor.map(loiter, range(6)[::-1])    #5
    display('results:', results)    #6
    display('Waiting for individual results:')
    for i, result in enumerate(results):    #7
        display(f'result {i}: {result}')
    

if __name__ == '__main__':
    main()

[11:08:12] Script starting.
[11:08:12] 					loiter(5):doing nothing for 5s
[11:08:12] 				loiter(4):doing nothing for 4s
[11:08:12] 			loiter(3):doing nothing for 3s
[11:08:12] results: <generator object Executor.map.<locals>.result_iterator at 0x0000027246C8FA70>
[11:08:12] Waiting for individual results:
[11:08:15] 			loiter(3): done.
[11:08:15] 		loiter(2):doing nothing for 2s
[11:08:16] 				loiter(4): done.
[11:08:16] 	loiter(1):doing nothing for 1s
[11:08:17][11:08:17] 		loiter(2): done.
[11:08:17] loiter(0):doing nothing for 0s
[11:08:17] loiter(0): done.
 					loiter(5): done.
[11:08:17] result 0: 50
[11:08:17] result 1: 40
[11:08:17] result 2: 30
[11:08:17] result 3: 20
[11:08:17] 	loiter(1): done.
[11:08:17] result 4: 10
[11:08:17] result 5: 0


1. This function simply prints whatever arguments it gets, preceded by a timestamp in the format [HH:MM:SS].

2. `loiter` does nothing except display a message when it starts, sleep for `n` seconds, then display a message when it ends; tabs are used to indent the messages according to the value of `n`.

3. `loiter` returns `n * 10` so we can see how to collect results.

4. Create a `ThreadPoolExecutor` with three threads.

5. Submit five tasks to the `executor`. Since there are only there threads, only three of those tasks will start immediately: the calls `loiter(0)` , `loiter(1)` , and `loiter(2)` . The other two tasks will be queued.

6. Immediately display the results of invoking `executor.map`: it's a generator, as the output shows.

7. The `enumerate` call in the `for` loop will implicitly invoke `next(results)`, which in turn will invoke `_f.result()` on the (internal) _f future representing the first `call`, `loiter(0)`. The `result` method will block until the future is done, therefore each iteration in this loop will have to wait for the next result to be ready.