future 指一种对象，表示异步执行的操作。

并发下载，每次下载的顺序都不同。

在 I/O 密集型应用中，如果代码写得正确，那么不管使用哪种并发策略（线程或 asyncio 包），吞吐量都比顺序执行的代码高很多。

In [1]:
# flags.py
import os
import time
import sys

import requests  # 非标准库

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()  # 列出人口最多的 20 个国家的 ISO 3166 国家代码

BASE_URL = 'http://flupy.org/data/flags'  # 获取国旗图像的网站

DEST_DIR = 'downloads/'  # 保存的目录


# 保存图像
def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


# 下载图像
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content  # 二进制内容


# 显示字符串，空格分隔
def show(text):
    print(text, end=' ')
    sys.stdout.flush()


# 顺序下载
def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')

    return len(cc_list)


# 下载并计时
def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main()

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 21.24s


concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和 ProcessPoolExecutor 类，这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。这两个类在内部维护着一个工作线程或进程池，以及要执行的任务队列。

编写并发代码时经常这样重构：把依序执行的 for 循环体改成函数，以便并发调用。

In [2]:
# flags_threadpool.py
import os
import time
import sys
from concurrent import futures  # <1>

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = 'downloads/'

MAX_WORKERS = 20  # 最多使用 20 个线程


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


def show(text):
    print(text, end=' ')
    sys.stdout.flush()


def download_one(cc):  # <3>
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))  # 设定工作的线程数量
    with futures.ThreadPoolExecutor(workers) as executor:  # 在所有线程完执行完毕前阻塞线程
        res = executor.map(download_one, sorted(cc_list))  # 在多个线程中并发调用

    return len(list(res))  # 如果有线程抛出异常，则在此抛出


def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main()

BD BR CN DE EG FR ID JP IN RU TR VN CD ET MX PK IR NG PH US 
20 flags downloaded in 2.11s


标准库中有两个名为 Future 的类：concurrent.futures.Future 和 asyncio.Future 。这两个 Future 类的实例都表示可能已经完成或尚未完成的延迟计算。

future 封装待完成的操作，可以放入队列，完成的状态可以查询，得到结果（或抛出异常）后可以获取结果（或异常）。

通常情况下不应爱创建 future ，而只能由并发框架（concurrent.futures 或 asyncio）实例化，future 表示终将发生的事情，而确定某件事会发生的唯一方式是执行的时间已经排定。
- 只有排定把某件事交给 concurrent.futures.Executor 子类处理时，才会创建 concurrent.futures。Future 实例

这两种 future 都有 .done() 方法，这个方法不阻塞，返回值是布尔值，指明 future 链接的可调用对象是否已经执行。

两个 Future 类都有 .add_done_callback() 方法：这个方法只有一个参数，类型是可调用对象，future 运行结束后会调用指定的可调用对象

两个 Future 类都有 .result 方法
- 在 future 运行结束后调用：返回可调用对象的结果，或者重新跑出执行可调用对象时抛出的异常
- future 没有运行结束：
    - concurrent.futures.Future 实例调用 f.result() 方法会阻塞调用方所在的线程，直到有结果返回。此时，result 方法可以接收可选的 timeout 参数，如果在指定的时间内 future 没有运行完毕，会抛出 TimeoutError 异常。
    - asyncio.Future.result 方法不支持设定超时时间，在那个库汇中获取 future 的结果最好使用 yield form 结构。
    
    
两个库中有几个函数会返回 future，其他函数则使用 future 。Executor.map 方法属于后者：返回值是一个迭代器，迭代器的 \_\_next__ 方法调用各个 future 的 result 方法，因此得到的是各个 future 的结果，而非 future 本身。

concurrent.futures.as_completed 函数的参数是一个 future 列表，返回值是一个迭代器，在 future 运行结束后产出 future 。

修改 flags_threadpool.py ，把 download_many 函数中的 executor.map 方法换成 executor.submit 方法和 futures.as_completed 函数。

future 的 repr() 方法会显示 future 的状态
- running：正在工作
- pending：等待可用线程

使用 concurrent.futures 库实现的示例受全局解释器锁（Global Interpreter Lock, GIL）的限制。

In [3]:
# flags_threadpool_ac.py
from concurrent import futures

MAX_WORKERS = 20

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


def show(text):
    print(text, end=' ')
    sys.stdout.flush()

    
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    cc_list = cc_list[:5]  # 人口最多的 5 个国家
    with futures.ThreadPoolExecutor(max_workers=3) as executor:  # 3 个线程
        to_do = []
        for cc in sorted(cc_list):  # 按字母顺序迭代
            future = executor.submit(download_one, cc)  # 排定并返回 future
            to_do.append(future)  # 存储 future
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))  # 国家，对应的 future

        results = []
        for future in futures.as_completed(to_do):  # 不阻塞，返回 future
            res = future.result()  # future 运行结果
            msg = '{} result: {!r}'
            print(msg.format(future, res)) # future 及其结果
            results.append(res)

    return len(results)


def main():  # <10>
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    

if __name__ == '__main__':
    main()

Scheduled for BR: <Future at 0x2a3ad25eac0 state=running>
Scheduled for CN: <Future at 0x2a3ad272be0 state=running>
Scheduled for ID: <Future at 0x2a3ad284310 state=running>
Scheduled for IN: <Future at 0x2a3ad284190 state=pending>
Scheduled for US: <Future at 0x2a3ad27b130 state=pending>
ID CN BR <Future at 0x2a3ad284310 state=finished returned str> result: 'ID'
<Future at 0x2a3ad272be0 state=finished returned str> result: 'CN'
<Future at 0x2a3ad25eac0 state=finished returned str> result: 'BR'
US <Future at 0x2a3ad27b130 state=finished returned str> result: 'US'
IN <Future at 0x2a3ad284190 state=finished returned str> result: 'IN'

5 flags downloaded in 2.16s


In [4]:
# flags_asyncio.py
import os
import time
import sys
import asyncio  # <1>

import aiohttp  # <2>


POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


async def get_flag(session, cc):  # <3>
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    async with session.get(url) as resp:        # <4>
        return await resp.read()  # <5>


def show(text):
    print(text, end=' ')
    sys.stdout.flush()


async def download_one(session, cc):  # <6>
    image = await get_flag(session, cc)  # <7>
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


async def download_many(cc_list):
    async with aiohttp.ClientSession() as session:  # <8>
        res = await asyncio.gather(                 # <9>
            *[asyncio.create_task(download_one(session, cc))
                for cc in sorted(cc_list)])

    return len(res)

# def main():  # <10>
#     t0 = time.time()
#     count = asyncio.run(download_many(POP20_CC))
#     elapsed = time.time() - t0
#     msg = '\n{} flags downloaded in {:.2f}s'
#     print(msg.format(count, elapsed))

async def main():  # <10>
    t0 = time.time()
    count = await download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    

if __name__ == '__main__':
    await main()

VN CN IN FR NG JP TR DE BR ID BD EG PH US ET MX PK CD IR RU 
20 flags downloaded in 1.67s


Cpython 解释器本身就不是线程安全的，因此有全局解释器锁（GIL），一次只允许使用一个线程执行 Python 字节码。

标准库中所有执行阻塞型 I/O 操作的函数，在等待操作系统返回结果时都会释放 GIL 。I/O 密集型 Python 程序能够从中受益：一个 Python 线程等待网络响应时，阻塞性 I/O 函数会释放 GIL ，再运行一个线程

concurrent.futures 模块实现的是真正的并行计算，它使用 ProcessPoolExecutor 类把工作分配给多个 Python 进程处理。

ProcessPoolExecutor 和 ThreadPoolExecutor 类都实现了通用的 Executor 接口，因此使用 concurrent.futures 模块能轻松地把基于线程的方案改为基于进程的方案。

这两个实现 Executor 接口的类唯一值得注意的区别是：ThreadPoolExecutor.\_\_init__ 方法需要 max_workers 参数，指定线程池中线程的数量；ProcessPoolExecutor 默认值是 os.cpu_count() 函数返回的 CPU 数量。

ProcessPoolExecutor 的价值体现在 CPU 密集型作业上。

如果使用 Python 处理 CPU 密集型工作，应该试试 PyPy

In [5]:
# arcfour_futures.py
import sys
import time
from concurrent import futures
from random import randrange
from arcfour import arcfour_test

JOBS = 12
SIZE = 2**18

KEY = b"'Twas brillig, and the slithy toves\nDid gyre"
STATUS = '{} workers, elapsed time: {:.2f}s'


def main(workers=None):
    if workers:
        workers = int(workers)
    t0 = time.time()

    with futures.ProcessPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        to_do = []
        for i in range(JOBS, 0, -1):
            size = SIZE + int(SIZE / JOBS * (i - JOBS/2))
            job = executor.submit(arcfour_test, size, KEY)
            to_do.append(job)

        for future in futures.as_completed(to_do):
            res = future.result()
            print('{:.1f} KB'.format(res/2**10))

    print(STATUS.format(actual_workers, time.time() - t0))


if __name__ == '__main__':
    if len(sys.argv) == 2:
        workers = int(sys.argv[1])
    else:
        workers = None
    main(workers)

149.3 KB
170.7 KB
192.0 KB
213.3 KB
234.7 KB
256.0 KB
277.3 KB
298.7 KB
320.0 KB
341.3 KB
362.7 KB
384.0 KB
12 workers, elapsed time: 1.41s


In [6]:
# sha_futures.py
# 使用 hashlib 实现 SHA-256 算法
import sys
import time
from concurrent import futures
from sha import sha

JOBS = 12
SIZE = 2**20
STATUS = '{} workers, elapsed time: {:.2f}s'


def main(workers=None):
    if workers:
        workers = int(workers)
    t0 = time.time()

    with futures.ProcessPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
        for future in futures.as_completed(to_do):
            res = future.result()
            print(res)

    print(STATUS.format(actual_workers, time.time() - t0))

if __name__ == '__main__':
    if len(sys.argv) == 2:
        workers = int(sys.argv[1])
    else:
        workers = None
    main(workers)

d383d59bf59532702669aa9519b01670456f87ff64acb8bf854019e6be6ba432
925103567ea09e84a2ffe09fef8e7be4e8ae1e66d51e15f0aedd74abe805ecc7
c958b095c686a578d3a1410f9d6749aa9bb8c0c4af978f1b1f5fd0a643df737e
d2c9c0be744370852c4a7377c7e709bb0d580901851ad2b57615addb10047d46
49b00d71b76bf8032d4f6248e5b7a3f53893b09c8f29807501c49475edb88b03
ea60116b3385eff45819b8aa690fe32111cddd06565f18ede4bad6ae15891e88
ca5eac96f853bfb28d5c77a83b73bfc651e29f97d183a8af847ba23df5fe8e7a
8151a51f34d05d9ddf4e0f5898def85f38a04447f030046b336fda13bc796c85
b2e6b671d0fb60a58690dd929dfe01129dc53580c4007f8b2af3794fb733dd18
1f529e6ef92b4c5b251e6e4be361ddcb2ac6c3a4f9bb0611f3b5e6d3b8e65e82
515963004c4961459da026981a31e877603ff7971fcfa3c07be0883942cff54f
be9594e126fefb2cfe021baadeec34a1a99146d58299843b50f8272f5eb64d59
12 workers, elapsed time: 1.84s


Executor.map 函数返回结果的顺序与调用开始的顺序一致；

executor.sumbit 和 futures.as_completed 组合比 executor.map 更灵活
- submit 方法能处理不同的可调用对象和参数
- map 方法只能处理参数不同的同一个可调用对象
- 传给 futures.as_completed 函数的 future 集合可以来自多个 Executor 实例

In [7]:
from time import sleep, strftime
from concurrent import futures


def display(*args):  # 打印传入的参数
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)


def loiter(n):  # 打印消息，休眠 n 秒，再打印消息
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10


def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)  # 3 个线程
    results = executor.map(loiter, range(5))  # 提交任务
    display('results:', results)  # 返回生成器
    display('Waiting for individual results:')
    for i, result in enumerate(results):  # 隐式调用 next(result)，result 方法会阻塞，直到 future 运行结束
        display('result {}: {}'.format(i, result))


main()

[14:25:07] Script starting.
[14:25:07] loiter(0): doing nothing for 0s...
[14:25:07] [14:25:07] 	loiter(1): doing nothing for 1s...
loiter(0): done.
[14:25:07] [14:25:07]		loiter(2): doing nothing for 2s...
[14:25:07] 			loiter(3): doing nothing for 3s...
 results: <generator object Executor.map.<locals>.result_iterator at 0x000002A3AD5F7190>
[14:25:07] Waiting for individual results:
[14:25:07] result 0: 0
[14:25:08] 	loiter(1): done.
[14:25:08][14:25:08]  				loiter(4): doing nothing for 4s...result 1: 10

[14:25:09] 		loiter(2): done.
[14:25:09] result 2: 20
[14:25:10] 			loiter(3): done.
[14:25:10] result 3: 30
[14:25:12] 				loiter(4): done.
[14:25:12] result 4: 40


处理错误：
- flags2_common.py：通用函数和设置
- flags2_sequential.py：HTTP 顺序下载客户端
- flags2_threadpool.py：HTTP 并发客户端
- flags2_asyncio.py：HTTP 并发客户端

tqdm 函数能处理任何可迭代对象，生成一个迭代器；使用这个迭代器时，显示进度条和完成全部迭代预计的剩余时间。

In [8]:
import time
from tqdm import tqdm

for i in tqdm(range(1000)):
    time.sleep(.01)

100%|█████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [00:10<00:00, 94.54it/s]


In [9]:
# LOCAL 服务器，人口最多的 20 国国旗，1 个并发连接
%run -i flags2_sequential.py

  0%|                                                                                                          | 0/20 [00:00<?, ?it/s]

LOCAL site: http://localhost:8001/flags
Searching for 20 flags: from BD to VN
1 concurrent connection will be used.


100%|█████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [01:20<00:00,  4.01s/it]

--------------------
0 flags downloaded.
20 errors.
Elapsed time: 80.23s





In [10]:
# DELAY 服务器，以 A/B/C 开头的所有国旗
%run -i flags2_threadpool.py -s DELAY a b c

  0%|                                                                                                          | 0/78 [00:00<?, ?it/s]

DELAY site: http://localhost:8002/flags
Searching for 78 flags: from AA to CZ
30 concurrent connections will be used.


100%|█████████████████████████████████████████████████████████████████████████████████████████████████| 78/78 [00:12<00:00,  6.49it/s]

--------------------
0 flags downloaded.
78 errors.
Elapsed time: 12.08s





In [11]:
# ERROR 服务器，100 面国旗，100 个并发连接
# %run -i flags2_asyncio.py -s ERROR -al 100 -m 100

![](flags2.png)

对 futures.as_completed 函数特别有用的惯用法：构建一个字典，把各个 future 映射到其他数据（future 运行结束后可能有用）上。

对 CPU 密集型工作来说，要启动多个进程，规避 GIL

threading 模块、multiprocessing 模块

Python 3 把 thread 模块重命名为 \_thread ，以此强调这是底层实现，不应该在应用代码中使用。

Celery 是一个任务队列，用于把工作分配给多个线程和进程，甚至是不同的设备。

对于 CPU 密集型和数据密集型并行处理，可以用分布式计算引擎 Apache Spark