In [None]:
# 由于需要测试 niginx 环境，因此需要在本地环境运行相关的程序
!mkdir ch17
!mkdir ch17/downloads
!touch ch17/__init__.py

mkdir: 无法创建目录"ch17": 文件已存在


## 17.1 示例： 网络下载的三种风格

- 为了高效的处理 IO，需要使用并发，因为网络有很高的延迟
  - 为了不浪费 CPU 周期去等待，可以在收到网络请求前做些其它事情

- 示例 17.2 flags.py: 依序下载的脚本；另外两个脚本会重用其中几个函数

In [155]:
%%writefile ch17/flags.py
import os
import time
import sys

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR ').split()  # 人口最多的 20 个国家的 ISO 3166 国家代码，按人口数据降序排列

BASE_URL = 'http://localhost:8002/flags'

DEST_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)),'downloads/') 


def save_flag(img, filename):
  """
  把 img (字节序列) 保存到 DEST_DIR 目录中，命名为 filename
  """
  path = os.path.join(DEST_DIR, filename)
  with open(path, 'wb') as fp:
    fp.write(img)


def get_flag(cc):
  """
  下载指定国家代码 (cc) 的国旗图像，返回响应中的二进制内容
  """
  url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
  resp = requests.get(url)
  return resp.content


def show(text):
  """
  显示一个字符串，然后刷新 sys.stdout，这样能在一行消息中看到进度
  """
  print(text, end=' ')
  sys.stdout.flush()


def download_many(cc_list):
  """
  按照字母表顺序迭代下载国家国旗，返回下载的国旗数量
  """
  for cc in sorted(cc_list):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')

  return len(cc_list)


def main(download_many):
  """
  main 函数记录并报告运行 download_many 函数之后的耗时
  """
  t0 = time.time()
  count = download_many(POP20_CC)
  elapsed = time.time() - t0
  msg = '\n{} flags download in {:.2f}s'
  print(msg.format(count, elapsed))


if __name__ == '__main__':
  main(download_many)

Overwriting ch17/flags.py


In [156]:
!python ch17/flags.py

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags download in 10.17s


###### 示例 17.3 flags_threadpool.py: 使用 futures.ThreadPoolExecutor 类实现多线程下载的脚本

In [157]:
%%writefile ch17/flags_threadpool.py
from concurrent import futures

from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20  # 设定 ThreadPoolExecutor 类最多使用几个线程


def download_one(cc):
  """
  下载一个图像的函数，在各个线程中执行的函数
  """
  image = get_flag(cc)
  show(cc)
  save_flag(image, cc.lower() + '.gif')
  return cc


def download_many(cc_list):
  workers = min(MAX_WORKERS, len(cc_list))  # 设定允许的线程数量
  print('running')
  with futures.ThreadPoolExecutor(workers) as executor:  # excutor.__exit__ 方法会调用 executor.shutdown(wait=True) 方法，它会在所有线程都执行完毕前阻塞进程
    res = executor.map(download_one, sorted(cc_list))  # map 方法返回的是一个生成器，可以迭代，获取各个函数的返回值

  return len(list(res))  # 返回获取的结果数量，如果有线程抛出异常，异常会在这里抛出


if __name__ == '__main__':
  main(download_many)

Overwriting ch17/flags_threadpool.py


In [158]:
!python ch17/flags_threadpool.py

running
EG CD BD CN IN ET FR TR IR JP ID MX VN NG BR US PH PK RU DE 
20 flags download in 0.56s


- 当网络有延迟时，并发下载要远快于依序下载

### 17.1.3 future 在哪里

- 标准库中有两个名为 `Future` 的类：`concurrent.futures.Future` 和 `asyncio.Future`
  - 这两个类作用相同：两个 `Future` 类的实例均可表示已经完成或者尚未完成的延迟计算
  - 两种 `future` 都有 `.done()` 方法，此方法不阻塞，返回的是布尔值，指时 `future` 链接的可调用对象是否已经执行
  - 两个 `Future` 类都有 `.add_done_callback()` 方法
    - 此方法只有一个参数，类型是可调用对象， `future` 运行结束后会调用指定的可调用对象 
  - 都有 `.result()` 方法
    - 在 `future` 运行结束后调用，此方法在两个 `Future` 类中的作用相同
      - 返回可调用对象的结果
      - 或者重新抛出执行可调用对象时抛出的异常
    - 如果 `future` 没有运行结束， `result` 类方法在两个 `Future` 类中的行为差别很大
      - 对于 `concurrency.futures.Future` 实例来说，调用 `f.result()` 方法会阻塞调用方所在的线程，直到有结果返回
        - 还可接收可选的 `timeout` 参数，如果指定的时间内 `future` 没有运行完毕，会抛出 `TimeoutError` 异常 
      - `asyncio.Future.result` 方法不支持设定超时时间，在此库中获取 `future` 的结果，最好使用 `yield from` 结构
  - 这两个库中有几个函数会返回 `future`， 其它函数则使用 `future`
    - `Executor.map` 方法返回值是一个迭代器，其 `__next__` 方法调用各个 `future` 的 `result` 方法
    - `concurrent.futures.as_completed` 的函数的参数是一个 `future` 列表，返回值是一个迭代器，在 `future` 运行结束后产出 `future`
- 通常情况下，自己不应创建 `future`， 而只能由并发框架 (concurrent.futures 或 asyncio) 实例化

###### 示例 17.4  `flags_threadpool_ac.py`： 把 `download_many` 函数中的 `executor.map` 方法换成 `executor.submit` 方法和 `future.as_completed` 函数

In [None]:
%%writefile ch17/flags_threadpool_ac.py
from concurrent import futures

from flags import save_flag, get_flag, show, main
from flags_threadpool import download_one

BASE_URL = 'http://localhost:8002/flags'


def download_many(cc_list):
  cc_list = cc_list[:5]
  with futures.ThreadPoolExecutor(max_workers=3) as executor:  
    to_do = []
    for cc in sorted(cc_list):
      future = executor.submit(download_one, cc)  # executor.submit 方法排定可调用对象的执行时间，然后返回一个 future，表示这个待执行的操作
      to_do.append(future)
      msg = 'Scheduled for {}:{}'
      print(msg.format(cc, future))  # 显示一个消息，包含国家代码和对应的 future
      
  results = []
  for future in futures.as_completed(to_do):  # 在 future 运行结束后产出 future
    res = future.result()
    msg = '{} result: {!r}'
    print(msg.format(future, res))
    results.append(res)

  return len(list(results))  # 返回获取的结果数量，如果有线程抛出异常，异常会在这里抛出


if __name__ == '__main__':
  main(download_many)

Overwriting ch17/flags_threadpool_ac.py


In [None]:
!python ch17/flags_threadpool_ac.py

Scheduled for BR:<Future at 0x7f7af745e320 state=running>
Scheduled for CN:<Future at 0x7f7af745e9e8 state=running>
Scheduled for ID:<Future at 0x7f7af745ef98 state=running>
Scheduled for IN:<Future at 0x7f7af746e940 state=pending>
Scheduled for US:<Future at 0x7f7af746ef60 state=pending>
BR CN ID IN US <Future at 0x7f7af745e9e8 state=finished returned str> result: 'CN'
<Future at 0x7f7af745ef98 state=finished returned str> result: 'ID'
<Future at 0x7f7af746ef60 state=finished returned str> result: 'US'
<Future at 0x7f7af746e940 state=finished returned str> result: 'IN'
<Future at 0x7f7af745e320 state=finished returned str> result: 'BR'

5 flags download in 0.01s


## 17.2 阻塞型 I/O 和 GIL

- 之前的脚本程序都不能并行下载，因为使用 `concurrent.futures` 库实现的两个示例受 GIL(Global Interpreter Lock，全局解释器锁) 的限制
- `CPython` 解释器本身不是线程安全的，因此有全局解释器锁 (GIL)，一次只允许使用一个线程执行 Python 字节码
  - 一个 Python 进程通常不能使用多个 CPU 核心
- 标准库中所有执行阻塞型 I/O 操作的函数以及 `time.sleep()` 函数，在等待操作系统返回结果时都会释放 GIL
  - Python 在这个层次上可以使用多线程

## 17.3 使用 `concurrent.futures` 模块启动进程

- `concurrent.futures` 实现的是真正的并行运算，其使用 `ProcessPoolExecutor` 类把工作分配给多个 Python 进程处理
  - 如果使用 CPU 密集型处理，使用这个模块能绕开 GIL，利用所有可用的 CPU 核心
- `ProcessPoolExecutor` 大多数情况下不用指定 `max_workers` 参数，其默认值为 `os.cpu_count()` 函数返回的 CPU 数量
- 如果使用 Python 处理 CPU 密集型工作，可以尝试 [PyPy](http://pypy.org)

###### 使用 `ProcessPoolExecutor` 类改写示例 17-3

In [159]:
%%writefile ch17/flags_processpool.py
from concurrent import futures

from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20  # 设定 ThreadPoolExecutor 类最多使用几个线程


def download_one(cc):
  """
  下载一个图像的函数，在各个线程中执行的函数
  """
  image = get_flag(cc)
  show(cc)
  save_flag(image, cc.lower() + '.gif')
  return cc


def download_many(cc_list):
  print('running')
  with futures.ProcessPoolExecutor() as executor:  # excutor.__exit__ 方法会调用 executor.shutdown(wait=True) 方法，它会在所有线程都执行完毕前阻塞进程
    res = executor.map(download_one, sorted(cc_list))  # map 方法返回的是一个生成器，可以迭代，获取各个函数的返回值

  return len(list(res))  # 返回获取的结果数量，如果有线程抛出异常，异常会在这里抛出


if __name__ == '__main__':
  main(download_many)

Overwriting ch17/flags_processpool.py


In [160]:
!python ch17/flags_processpool.py

running
CD BD BR CN EG ID DE IR FR ET IN JP US RU PK PH NG MX TR VN 
20 flags download in 1.06s


#### `ProcessPoolExecutor` 的价值体现在 CPU 密集型作业上

##### 纯粹使用 Python 实现 RC4 算法

###### 示例 A-8 arcfour.py：兼容 RC4 算法

In [None]:
%%writefile ch17/arcfour.py
"""兼容 RC4 的算法"""


def arcfour(key, in_bytes, loops=20):

  kbox = bytearray(256)  # 创建储存键的数组
  for i, car in enumerate(key):  # 复制键和向量
    kbox[i] = car
  j = len(key)
  for i in range(j, 256):  # 重复到底
    kbox[i] = kbox[i-j]

  # [1] 初始化 sbox
  sbox = bytearray(range(256))

  # 按照 CipherSaber -2 的建议，不断打乱 sbox
  # http://ciphersaber.gurus.com/faq.html#cs2
  j = 0
  for k in range(loops):
    for i in range(256):
      j = (j + sbox[i] + kbox[i]) % 256
      sbox[i], sbox[j] = sbox[j], sbox[i]

  # 主循环
  i = 0
  j = 0
  out_bytes = bytearray()

  for car in in_bytes:
    i = (i + 1) % 256
    # [2] 打乱 sbox
    j = (j + sbox[i])  % 256
    sbox[i], sbox[j] = sbox[j], sbox[i]
    # [3] 计算 t
    t = (sbox[i] + sbox[j]) % 256
    k = sbox[t]
    car ^= k
    out_bytes.append(car)

  return out_bytes


def test():
  from time import time
  clear = bytearray(b'1234567890' * 100000)
  t0 = time()
  cipher = arcfour(b'key', clear)
  print('elapse time: %.2fs' % (time() - t0))
  result = arcfour(b'key', cipher)
  assert result == clear, '%r != %r' % (result, clear)
  print('elapsed time: %.2fs' % (time() - t0) )
  print('OK')


if __name__ == '__main__':
  test()


Overwriting ch17/arcfour.py


In [None]:
!python ch17/arcfour.py

elapse time: 0.28s
elapsed time: 0.55s
OK


###### 示例 A-7 arcfour_future.py: futures.ProcessPoolExecutor 用法示例

In [None]:
%%writefile ch17/arcfour_futures.py
import sys
import time
from concurrent import futures
from random import randrange
from arcfour import arcfour

JOBS = 12
SIZE = 2 ** 18

KEY = b"'Twas brilling, and the slithy toves\nDid gyre"
STATUS = '{} workers, elapsed time: {:.2f}s'


def arcfour_test(size, key):
  in_text = bytearray(randrange(256) for i in range(size))
  cypher_text = arcfour(key, in_text)
  out_text = arcfour(key, cypher_text)
  assert in_text == out_text, 'Failed arcfour_test'
  return size


def main(workers=None):
  if workers:
    workers = int(workers)
  t0 = time.time()

  with futures.ProcessPoolExecutor(workers) as executor:
    actual_workers = executor._max_workers
    to_do = []
    for i in range(JOBS, 0, -1):
      size = SIZE + int(SIZE / JOBS * (i - JOBS/2))
      job = executor.submit(arcfour_test, size, KEY)
      to_do.append(job)

    for future in futures.as_completed(to_do):
      res = future.result()
      # print('{:.1f} KB'.format(res / 2**10))

    print(STATUS.format(actual_workers, time.time() - t0))


if __name__ == '__main__':
  if len(sys.argv) == 2:
    workers = int(sys.argv[1])
  else:
    workers = None
  main(workers)


Overwriting ch17/arcfour_futures.py


In [None]:
 !for i in `seq 1 4`; do python ch17/arcfour_futures.py $i; done

1 workers, elapsed time: 3.64s
2 workers, elapsed time: 1.81s
3 workers, elapsed time: 1.70s
4 workers, elapsed time: 0.93s


##### 使用 `hashlib` 模块实现的 SHA-256 算法

###### 示例 A-9 sha_futures.py:  futures.ProcessPoolExecutor 用法示例

In [None]:
%%writefile ch17/sha_futures.py
import sys
import time
import hashlib
from concurrent import futures
from random import randrange

JOBS = 12
SIZE = 2 ** 20
STATUS = '{} workers, elapse time: {:.2f}s'


def sha(size):
  data = bytearray(randrange(256) for i in range(size))
  algo = hashlib.new('sha256')
  algo.update(data)
  return algo.hexdigest()


def main(workers=None):
  if workers:
    workers = int(workers)
  t0 = time.time()

  with futures.ProcessPoolExecutor(workers) as executor:
    actual_works = executor._max_workers
    to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
    for future in futures.as_completed(to_do):
      res = future.result()
      # print(res)

  print(STATUS.format(actual_works, time.time() - t0))


if __name__ == '__main__':
  if len(sys.argv) == 2:
    workers = int(sys.argv[1])
  else:
    workers = None
  main(workers)


Overwriting ch17/sha_futures.py


In [None]:
!for i in `seq 1 4`; do python ch17/sha_futures.py $i; done

1 workers, elapse time: 6.91s
2 workers, elapse time: 3.45s
3 workers, elapse time: 3.04s
4 workers, elapse time: 2.10s


## 17.4 实验 `Executor.map` 方法

- `Executor.map` 函数返回结果的顺序与调用开始的顺序一致
  - 如果后调用的任务先完成，仍然等先开始调用的任务完成后，再返回结果
- 更通用的做法是，不管提交的顺序，只要有结果就获取
  - 如此，需要把 `Executor.submit` 方法和 `futures.as_completed` 函数结合起来使用
- `submit` 方法能处理不同的可调用对象和参数，而 `executor.map` 只能处理参数不同的同一个可调用对象，即 `submit` 更灵活

######示例 17-6 demo_executor_map.py:简单演示 `ThreadPoolExecutor` 类的 `map` 方法

In [161]:
%%writefile ch17/demo_executor_map.py
from time import sleep, strftime
from concurrent import futures


def display(*args):
  """
  将传入的参数打印出来，并在前面加上 [HH:MM::SS] 格式的时间戳
  """
  print(strftime('[%H:%M:%S]'), end=' ')
  print(*args)


def loiter(n):
  """
  休眠并显示特定的消息
  """
  msg = '{}loiter({}): doing nothing for {} s...'
  display(msg.format('\t'*n, n, n))
  sleep(n)
  msg = '{}loiter({}): done.'
  display(msg.format('\t'*n, n))
  return n*10


def main():
  display('Script starting.')
  executor = futures.ThreadPoolExecutor(max_workers=3)
  results = executor.map(loiter, range(5))  # 非阻塞调用
  display('results:', results)
  display('Waiting for individual results:')
  for i, result in enumerate(results):
    display('result {}: {}'.format(i, result))


if __name__ == '__main__':
  main()

Overwriting ch17/demo_executor_map.py


In [162]:
!python ch17/demo_executor_map.py

[07:39:35] Script starting.
[07:39:35] loiter(0): doing nothing for 0 s...
[07:39:35] 	loiter(1): doing nothing for 1 s...
[07:39:35] loiter(0): done.
[07:39:35] 		loiter(2): doing nothing for 2 s...
[07:39:35] results: <generator object Executor.map.<locals>.result_iterator at 0x7fce67064150>
[07:39:35] 			loiter(3): doing nothing for 3 s...
[07:39:35] Waiting for individual results:
[07:39:35] result 0: 0
[07:39:36] 	loiter(1): done.
[07:39:36] 				loiter(4): doing nothing for 4 s...
[07:39:36] result 1: 10
[07:39:37] 		loiter(2): done.
[07:39:37] result 2: 20
[07:39:38] 			loiter(3): done.
[07:39:38] result 3: 30
[07:39:40] 				loiter(4): done.
[07:39:40] result 4: 40


- 运行结果如下

```shell
[17:02:02] Script starting.
[17:02:02] loiter(0): doing nothing for 0 s...  
[17:02:02] loiter(0): done.  # 第一个线程在第二个线程开始之前就结束了
[17:02:02] 	loiter(1): doing nothing for 1 s...  # loiter(1) 和 loiter(2) 立即开始
[17:02:02] 		loiter(2): doing nothing for 2 s...
[17:02:02] 			loiter(3): doing nothing for 3 s...  # loiter(0) 运行结束，有线程空闲，loiter(3) 开始运行
[17:02:02] results: <generator object Executor.map.<locals>.result_iterator at 0x7f297fac05c8>   # executor.map 方法返回的结果 (result) 是生成器，目前不会阻塞
[17:02:02] Waiting for individual results:
[17:02:02] result 0: 0  # 此时执行过程可能阻塞， results 生成器的 __next__ 方法必须等待当前的 future 运行结束，此点之前的所有事件在同一时刻发生，即 [17:02:02]
[17:02:03] 	loiter(1): done.  # 1s 钟后， loiter(1) 运行结束
[17:02:03] 				loiter(4): doing nothing for 4 s...  # 由于 loiter(1) 运行完毕，此线程闲置，开始运行 loiter(4)
[17:02:03] result 1: 10  # 显示 loiter(1) 的运行结果，此时， for 循环会阻塞，等待 loiter(2) 的运行结果
[17:02:04] 		loiter(2): done.  
[17:02:04] result 2: 20
[17:02:05] 			loiter(3): done.
[17:02:05] result 3: 30
[17:02:07] 				loiter(4): done.
[17:02:07] result 4: 40
```

## 17.5 显示下载进度并处理错误

###### [TQDM](https://github.com/noamraph/tqdm) 包显示文本动画进度条

In [None]:
!pip install tqdm

Collecting tqdm
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/f3/76/4697ce203a3d42b2ead61127b35e5fcc26bba9a35c03b32a2bd342a4c869/tqdm-4.46.1-py2.py3-none-any.whl (63kB)
[K    100% |████████████████████████████████| 71kB 3.2MB/s 
[?25hInstalling collected packages: tqdm
Successfully installed tqdm-4.46.1


In [None]:
import time
from tqdm import tqdm
for i in tqdm(range(1000)):
  time.sleep(.01)

100%|██████████| 1000/1000 [00:10<00:00, 96.62it/s]


- 为了能够预计剩余时间， `tqdm` 函数要获取一个能用 `len` 函数确定大小的可迭代对象

##### `flags2` 系列 HTTP 客户端示例

###### 示例 A-10 flags2_common.py

In [None]:
%%writefile ch17/flags2_common.py
"""
为后续的 flag 实例提供实用函数
"""

import argparse
from collections import namedtuple
from enum import Enum
import os
import sys
import string
import time

Result = namedtuple('Result', 'status data')

HTTPStatus = Enum('Status', 'ok not_found error')

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'LOCAL': 'http://localhost:8001/flags',
    'DELAY': 'http://localhost:8002/flags',
    'ERROR': 'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'


DEST_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'downloads/')
COUNTRY_CODES_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'country_codes.txt')


def save_flag(img, filename):
  path = os.path.join(DEST_DIR, filename)
  with open(path, 'wb') as fp:
    fp.write(img)


def inital_report(cc_list, actual_req, server_label):
  if len(cc_list) <= 10:
    cc_msg = ', '.join(cc_list)
  else:
    cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
  print('{} site: {}'.format(server_label, SERVERS[server_label]))
  msg = 'Searching for {} flag{}: {}'
  plural = 's' if len(cc_list) !=1 else ''
  print(msg.format(len(cc_list), plural, cc_msg))
  plural = 's' if actual_req != 1 else ''
  msg = '{} concurrent connection{} will be used.'
  print(msg.format(actual_req, plural))


def final_report(cc_list, counter, start_time):
  elapsed = time.time() - start_time
  print('-' * 20)
  msg = '{} flag{} downloaded.'
  plural = 's' if counter[HTTPStatus.ok] != 1 else ''
  print(msg.format(counter[HTTPStatus.ok], plural))
  if counter[HTTPStatus.not_found]:
    print(counter[HTTPStatus.not_found], 'not found.')
  if counter[HTTPStatus.error]:
    plural = 's' if counter[HTTPStatus.error] != 1 else ''
    print('{} error {}.'.format(counter[HTTPStatus.error], plural))
  print('Elapsed time: {:.2f}s'.format(elapsed))


def expand_cc_args(every_cc, all_cc, cc_args, limit):
  codes = set()
  A_Z = string.ascii_uppercase
  if every_cc:
    codes.update(a+b for a in A_Z for b in A_Z)
  elif all_cc:
    with open(COUNTRY_CODES_FILE) as fp:
      text = fp.read()
    codes.update(text.split())
  else:
    for cc in (c.upper() for c in cc_args):
      if len(cc) == 1 and cc in A_Z:
        codes.update(cc+c for c in A_Z)
      elif len(cc) == 2 and all(c in A_Z for c in cc):
        codes.add(cc)
      else:
        msg = 'each CC argument must be A to Z or AA to ZZ.'
        raise ValueError('*** Usage error: ' + msg)
  return sorted(codes)[:limit]


def process_args(defalut_concur_req):
  server_options = ', '.join(sorted(SERVERS))
  parser = argparse.ArgumentParser(
      description='Download flags for country codes. '
      'Default: top 20 countries by population.'

  )
  # add_argument 中 metaval 和 action 的作用
  # https://stackoverflow.com/questions/19124304/what-does-metavar-and-action-mean-in-argparse-in-python
  parser.add_argument('cc', metavar='CC', nargs='*', 
                      help='country code or 1st letter (eg. B for BA...BZ)')
  parser.add_argument('-a', '--all', action='store_true',
                      help='get all available flags (AD...ZW)')
  parser.add_argument('-e', '--every', action='store_true', 
                      help='get flags for every possible code (AA...ZZ)')
  parser.add_argument('-l', '--limit', metavar='N', type=int,
                      help='limit to N first codes', default=sys.maxsize)
  parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int, 
                      default=defalut_concur_req,
                      help='maximum concurrent requests (default={})'
                      .format(defalut_concur_req))
  parser.add_argument('-s', '--server', metavar='LABEL',
                      default=DEFAULT_SERVER,
                      help='Server to hit; one of {}(default={})'
                      .format(server_options, DEFAULT_SERVER))
  parser.add_argument('-v', '--verbose', action='store_true',
                      help='output detailed progress info')
  args = parser.parse_args()
  if args.max_req < 1:
    print('*** Usage error: --max_req CONCURRENT must be >= 1')
    parser.print_usage()
    sys.exit(1)
  if args.limit < 1:
    print('*** Usage error: --limit N must be >= 1')
    parser.print_usage()
    sys.exit(1)
  args.server = args.server.upper()
  if args.server not in SERVERS:
    print('*** Usage error: --server LABEL must be one of',
          server_options)
    parser.print_usage()
    sys.exit(1)
  try:
    cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
  except ValueError as exc:
    print(exc.args[0])
    praser.print_usage()
    sys.exit(1)
  if not cc_list:
    cc_list = sorted(POP20_CC)
  return args, cc_list


def main(download_many, defalut_concur_req, max_concur_req):
  args, cc_list = process_args(defalut_concur_req)
  actual_req = min(args.max_req, max_concur_req, len(cc_list))
  inital_report(cc_list, actual_req, args.server)
  base_url = SERVERS[args.server]
  t0 = time.time()
  counter = download_many(cc_list, base_url, args.verbose, actual_req)
  assert sum(counter.values()) == len(cc_list), \
    'some downloads are unaccounted for'
  final_report(cc_list, counter, t0)


Overwriting ch17/flags2_common.py


### 17.5.1 flags2 系列示例处理错误的方式

###### 示例 A-11 flags_sequential.py

In [None]:
%%writefile ch17/flags2_sequential.py
"""
下载多个国家的国旗 (包含错误处理代码)

依序下载版
"""
import collections

import requests
import tqdm

from flags2_common import main, save_flag, HTTPStatus, Result

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

# BEGIN FALGS2_BASIC_HTTP_FUNCTIONS
def get_flag(base_url, cc):
  url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
  resp = requests.get(url)
  if resp.status_code != 200:
    resp.raise_for_status()  # 当 HTTP 代码不是 200 时，使用此方法抛出异常
  return resp.content

def download_one(cc, base_url, verbose=False):
  try:
    image = get_flag(base_url, cc)
  except requests.exceptions.HTTPError as exc:  # 捕获 requests.exceptions.HTTPError 异常
    res = exc.response
    if res.status_code == 404:  # 特别处理 HTTP 404 错误
      status = HTTPStatus.not_found  # HTTPStatus 为 Enum 对象
      msg = 'not found'
    else:  # 重新抛出其他 HTTPError 异常，这些异常会向上冒泡，传给调用方
      raise
  else:
    save_flag(image, cc.lower() + '.gif')
    status = HTTPStatus.ok
    msg = 'OK'

  if verbose:  # 通过 -v/--verbose 选项来选择是否显示国家代码和状态信息
    print(cc, msg)

  return Result(status, cc)
# END FLAGS2_BASIC_HTTP_FUNCTIONS


# BEGIN FLAGS2_DOWNLOAD_MANY_SEQUENTIAL
def download_many(cc_list, base_url, verbose, max_req):
  counter = collections.Counter()  # 用于统计不同状态的下载信息
  cc_iter = sorted(cc_list)  # 按字母顺序传入国家代码列表
  if not verbose:  # 如果不是详细模式，则借助 tqdm 来显示进度条动画
    cc_iter = tqdm.tqdm(cc_iter)
  for cc in cc_iter:
    try:
      res = download_one(cc, base_url, verbose)  # 不断调用 download_one 函数，执行下载
    except requests.exceptions.HTTPError as exc:  # 处理 get_flag 函数抛出的与 HTTP 有关的且 download_one 没有处理的异常
      error_msg = 'HTTP error {res.status_code} - {res.reason}'
      error_msg = error_msg.format(res=exc.response)
    except requests.exceptions.ConnectionError as exc:
      error_msg = 'Connection error'
    else:
      error_msg = ''
      status = res.status

    if error_msg:
      status = HTTPStatus.error
    counter[status] += 1  # 以 HTTPStatus(一个 Enum) 中的值为键，增加计数器
    if verbose and error_msg:  # 如果是详细模式，则有错误，则显示带有当前国家代码的错误信息
      print('*** Error for {}:{}'.format(cc, error_msg))
  return counter
# END FLAGS2_DOWNLOAD_MANY_SEQEEN


if __name__ == '__main__':
  main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

Overwriting ch17/flags2_sequential.py


###### 示例 17-9 全部使用默认值运行 flags2_sequential.py 脚本：LOCAL 服务器,人口最多的 20 国国旗，1 个并发连接

In [None]:
!python3 ch17/flags2_sequential.py

LOCAL site: http://localhost:8001/flags
Searching for 20 flags: from BD to VN
1 concurrent connection will be used.
100%|██████████████████████████████████████████| 20/20 [00:00<00:00, 656.13it/s]
--------------------
20 flags downloaded.
Elapsed time: 0.03s


### 17.5.2 使用 `futures.as_completed` 函数

- `futures.as_completed` 函数的惯用法
  - 构建一个字典，把各个 `future` 映射到其他数据(`futurre` 运行结束后可能有用)上

###### 示例 17-14 flags2_threadpool.py：完整的代码清单

In [None]:
%%writefile ch17/flags2_threadpool.py
import collections
from concurrent import futures

import requests
import tqdm

from flags2_common import main, HTTPStatus
from flags2_sequential import download_one

DEFAULT_CONCUR_REQ = 30  # 默认并发请求数的最大值
MAX_CONCUR_REQ = 1000  # 限制最大的并发请求数，一项安全措施


def download_many(cc_list, base_url, verbose, concur_req):
  counter = collections.Counter()
  with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:  # main 函数中会选取 concur_req:MAX_CONCUR_REQ, len(cc_list), -m/--max_req 命令行选项的值，如此，能避免创建超过所需的线程
    to_do_map = {}  # 把各个 Futute 实例（表示一次下载）映射到相应的国家代码上，在处理错误时使用
    for cc in sorted(cc_list):
      future = executor.submit(download_one, cc, base_url, verbose)  # 排定一个可调用对象的执行时间，然后返回一个 Future 实例。第一个参数是可调用对象，其余参数是传递给可调用对象的参数
      to_do_map[future] = cc
    done_iter = futures.as_completed(to_do_map)  # 返回迭代器，在 future 运行结束后产出 future
    if not verbose:
      done_iter = tqdm.tqdm(done_iter, total=len(cc_list))  # done_iter 没有 len 函数，需要通过 totoal 参数，借助 len(cc_list) 来指定
    for future in done_iter:
      try:
        res = future.result()  # 要么返回结果，要么抛出可调用对象在执行过程中抛出的异常。 在此示例中，不会发生阻塞，因为 as_completed 方法只返回已经运行结速的 future
      except requests.exceptions.HTTPError as exc:  # 处理 get_flag 函数抛出的与 HTTP 有关的且 download_one 没有处理的异常
        error_msg = 'HTTP error {res.status_code} - {res.reason}'
        error_msg = error_msg.format(res=exc.response)
      except requests.exceptions.ConnectionError as exc:
        error_msg = 'Connection error'
      else:
        error_msg = ''
        status = res.status

      if error_msg:
        status = HTTPStatus.error
      counter[status] += 1  # 以 HTTPStatus(一个 Enum) 中的值为键，增加计数器
      if verbose and error_msg:  # 如果是详细模式，则有错误，则显示带有当前国家代码的错误信息
        cc = to_do_map[future]  # 以当前 future 为键，从 to_do_map 中获取国家代码
        print('*** Error for {}:{}'.format(cc, error_msg))

  return counter


if __name__ == '__main__':
  main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

Overwriting ch17/flags2_threadpool.py


###### 示例 17-8 flags2 系列脚本的帮助界面

In [None]:
!python3 ch17/flags2_threadpool.py -h

usage: flags2_threadpool.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
                            [-v]
                            [CC [CC ...]]

Download flags for country codes. Default: top 20 countries by population.

positional arguments:
  CC                    country code or 1st letter (eg. B for BA...BZ)

optional arguments:
  -h, --help            show this help message and exit
  -a, --all             get all available flags (AD...ZW)
  -e, --every           get flags for every possible code (AA...ZZ)
  -l N, --limit N       limit to N first codes
  -m CONCURRENT, --max_req CONCURRENT
                        maximum concurrent requests (default=30)
  -s LABEL, --server LABEL
                        Server to hit; one of DELAY, ERROR,
                        LOCAL(default=LOCAL)
  -v, --verbose         output detailed progress info


###### 示例 17-10 运行 flags2_threadpool.py 脚本,从 DELAY 服务器中下载国家代码以 A、B 或 C 开头的所有国旗

In [None]:
!python3 ch17/flags2_threadpool.py -s DELAY a b c

DELAY site: http://localhost:8002/flags
Searching for 78 flags: from AA to CZ
30 concurrent connections will be used.
100%|███████████████████████████████████████████| 78/78 [00:01<00:00, 49.39it/s]
Counter({<Status.ok: 1>: 43, <Status.not_found: 2>: 35})
78
--------------------
43 flags downloaded.
35 not found.
Elapsed time: 1.62s


###### 运行 flags2_threadpool.py 脚本,使用 100 个并发请求(-m 100)从 ERROR 服务器中下载100 面国旗(-al 100)

In [None]:
!python3 ch17/flags2_threadpool.py -s ERROR -al 100 -m 100

ERROR site: http://localhost:8003/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
100%|████████████████████████████████████████| 100/100 [00:00<00:00, 202.38it/s]
--------------------
72 flags downloaded.
28 error s.
Elapsed time: 0.70s


### 17.5.3 线程和多进程的替代方案

- 如果 `futures.ThreadPoolExecutor` 类对某个作业来说不够灵活，可以使用 `threading` 模块中的组件(如 `Thread`, `Lock`, `Semaphore`) 自行制定方案
  - 使用 `queue` 模块可以创建线程安全的队列
- 如果应用场景复杂，可以使用 `multiprocessing` 模块来代替 `futures.ProcessPoolExecutor` 类，其 API 与 `threading` 模块相仿，不过作业交给多个进程处理