# 🫁 ThreadPoolExecutor线程池

标准库`concurrent.futures`提供了`ThreadPoolExecutor`（线程池）和`ProcessPoolExecutor`（进程池）两个类

In [1]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

In [2]:
def run(delay):
    time.sleep(delay)
    print(f'Delay Task {delay} Finished')
    return delay

## 壹丨基本使用

In [3]:
# 创建一个最大容量为5的线程池
with ThreadPoolExecutor(max_workers=5) as t:
    task1 = t.submit(run, 1)
    task2 = t.submit(run, 2)
    task3 = t.submit(run, 3)

    # 通过done来判断线程是否完成
    print(f"Task 1 Finished: {task1.done()}")
    print(f"Task 2 Finished: {task2.done()}")
    print(f"Task 3 Finished: {task3.done()}")

    time.sleep(2.5)
    print(f"Task 1 Finished: {task1.done()}")
    print(f"Task 2 Finished: {task2.done()}")
    print(f"Task 3 Finished: {task3.done()}")

    # 通过result来获取返回值
    print('Task 1 Result: ', task1.result())


Task 1 Finished: False
Task 2 Finished: False
Task 3 Finished: False
Delay Task 1 Finished
Delay Task 2 Finished
Task 1 Finished: True
Task 2 Finished: True
Task 3 Finished: False
Task 1 Result:  1
Delay Task 3 Finished


1. 使用`submit()`提交线程任务，并立即返回任务句柄。
2. 使用`done()`方法判断是否结束
3. 使用`result()`方法获取线程返回值

## 贰丨主要方法

### 1. `wait()`方法

```python
wait(fs, timeout=None, return_when=ALL_COMPLETED)
```
`fs`：要执行的序列

`timeout`：等待最大时间，如果超过时间未执行完成则直接返回

`return_when`：`wait()`返回结果的条件，默认为`ALL_COMPLETED`（全部执行完成后返回）

In [4]:
from concurrent.futures import wait, FIRST_COMPLETED

# 创建一个最大容量为5的线程池
with ThreadPoolExecutor(max_workers=5) as t:
    all_task = [t.submit(run, delay) for delay in range(1, 5)]
    wait(all_task, return_when=FIRST_COMPLETED)
    print('Continue Other Tasks')

    wait(all_task, timeout=2.5)
    print('Continue Other Tasks')


Delay Task 1 Finished
Continue Other Tasks
Delay Task 2 Finished
Delay Task 3 Finished
Continue Other Tasks
Delay Task 4 Finished


1. 执行完第一个任务后，停止等待，继续主线任务
2. 在第一个任务完成后再进行2.5s延时，即总时间3.5s时停止等待，继续主线任务，此时只有任务4还在进行

### 2. `as_completed()`方法

当任务结束时，给主线程返回结果，使用`result()`方法获取结果

In [5]:
from concurrent.futures import as_completed

# 创建一个最大容量为5的线程池
with ThreadPoolExecutor(max_workers=5) as t:
    obj_list = []
    for delay in range(1, 5):
        obj = t.submit(run, delay)
        obj_list.append(obj)

    for future in as_completed(obj_list):
        res = future.result()
        print(f'Task {future} Result: {res}')

Delay Task 1 Finished
Task <Future at 0x1d16d055fd0 state=finished returned int> Result: 1
Delay Task 2 Finished
Task <Future at 0x1d16e5e4f50 state=finished returned int> Result: 2
Delay Task 3 Finished
Task <Future at 0x1d16e9cd510 state=finished returned int> Result: 3
Delay Task 4 Finished
Task <Future at 0x1d16ea50390 state=finished returned int> Result: 4


`as_completed()`方法是一个生成器，当没有完成任务时，会一直阻塞，除非设置`time_out`

### 3. `map()`方法

```python
map(fn, *iterables, timeout=None)
```

`fn`: 需要线程执行的函数

`iterables`: 接受一个可迭代对象

`timeout`: 和`wait()`的`timeout`一样，由于`map`返回线程执行结果，如果在`timeout`内没有执行完，会抛出异常`TimeoutError`


In [10]:
executor = ThreadPoolExecutor(max_workers=5)

for i, result in enumerate(executor.map(run, [4, 2, 3, 1], timeout=3.5)):
    print(f"Task {i} Result: {result}")

Delay Task 1 Finished
Delay Task 2 Finished
Delay Task 3 Finished


TimeoutError: 

使用列表循环添加线程，在设置`timeout`期限内未完成时会报`TimeoutError`，中断整个程序

In [11]:
executor = ThreadPoolExecutor(max_workers=5)

for i, result in enumerate(executor.map(run, [4, 2, 3, 1], timeout=5)):
    print(f"Task {i} Result: {result}")

Delay Task 1 Finished
Delay Task 2 Finished
Delay Task 3 Finished
Delay Task 4 Finished
Task 0 Result: 4
Task 1 Result: 2
Task 2 Result: 3
Task 3 Result: 1


对比在`timeout`内完成任务，可以正常执行后续操作

* 使用`map()`方法，无需再使用`submit()`方法
* 与`as_complete()`方法不同，如果任务的完成顺序与提交顺序不同，`map()`方法会等待所有任务完成，并按照提交的顺序返回结果
* `map()`方法返回结果是有序的，但是任务执行的顺序不一定有序

## 参考

[1] 稀土掘金，@全村de希望，[Python线程池 ThreadPoolExecutor 的用法及实战](https://juejin.cn/post/6844903861245706253)