# Python多线程
CPython解释器本身有全局解释器锁，因此一次只运行一个线程执行Python字节码。对于IO密集型任务来说，CPython解释器会在标准库中所有执行阻塞性I/O操作的函数等待操作系统返回结果时释放GIL锁，这意味着Python可在这个层次上使用多线程。
# 使用concurrent.futures模块下载
该模块的特色在于ThreadPoolExecutor和ProcessPoolExecutor类，这两个类在内部维护着一个工作线程或进程池，以及要执行的任务队列
## concurrent.futures.ThreadPoolExecutor实现并行下载
使用ThreadPoolExecutor.map方法可以最便捷的方式针对具体任务实现多线程并发。  
1.executor.map会返回一个执行结果生成器。  
2.ThreadPoolExecutor对象本身实现了上下文管理器协议，其\__exit__方法会调用自己的shutdown(wait=True)方法，阻塞等待全部线程执行完毕。  
3.迭代执行结果生成器来获取线程的返回结果，如果有线程抛出异常，会在这里抛出。

In [11]:
from concurrent import futures
import requests
import time
import sys
import os

POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    #  open(path, 'wb')f返回上下文管理器，然后调用其__enter__方法返回了自身
    with open(path, 'wb') as fp:
        fp.write(img)
        
def get_flag(cc):
    '''
    下载国家缩写对应的国旗
    '''
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()
    return resp.content

def show(text):
    print(text, end=' ')
    sys.stdout.flush()
    
# 主程序，接受可执行对象执行下载任务
def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    
MAX_WORKERS = 20

# 单次下载function
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

# 使用ThreadPoolExecutor.map实现多线程下载
def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list)) # 选取二者最小值，以免创建多余的线程
    # futures.ThreadPoolExecutor(workers)会得到上下文管理器（ThreadPoolExecutor对象），然后__enter__方法return self。
    # 其__exit__方法会调用自己的shutdown(wait=True)，该方法会阻塞等待全部线程执行完毕。
    with futures.ThreadPoolExecutor(workers) as executor:
        # 启动多线程，同时将Iterable对象的元素作为参数传给download_one方法
        res = executor.map(download_one, sorted(cc_list)) # 结果res为生成器对象，必须迭代来获取返回值
    # 迭代res生成器，如果有线程抛出异常，会在这里抛出
    return len(list(res))

if __name__ == '__main__':
    main(download_many)

INEG  BR CN TR RU NGBD FRJPVN  DE ID   IR CD ET PKMX  US PH 
20 flags downloaded in 1.15s


## future的含义
1.future是concurrent.futures和asyncio的重要组件，标准库有两个Future类：concurrent.futures.Future和asyncio.Future。两个类的作用相同，都是__用于封装待完成的操作，Future实例代表已经完成的或尚未完成的延迟计算，它本身代表着已排期的任务__。

2.当把某件任务（可调用对象）交给concurrent.futures.Executor的子类（如：ThreadPoolExecutor和ProcessPoolExecutor类）时才会创建concurrent.futures.Future实例用于对任务进行future概念封装。

3.两种Future类都有done()方法，会立即返回其内部的可调用对象是否已经执行（boolean），客户端一般不会询问future的状态甚至改变其状态，而交由并发框架（concurrent.futures和asyncio）管理。

4.两种Future类都有add_done_callback()方法，用于添加任务结束后的回调任务。__另外也都有result()方法，该方法会阻塞直到执行任务完成或抛出异常，用于返回可执行对象返回的结果，或重新抛出执行过程中的异常。__
## executor.map的执行过程
#### executor.map并不返回future对象，而是在其内部完成管理。它会返回一个生成器，该生成器在迭代时会调用各个future对象的result方法（与提交任务时的顺序相同，如果当前future未执行完则阻塞等待），因此我们得到的是各个future的结果。

In [9]:
from time import sleep, strftime

def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args) # args拆包
    
# 显示信息与线程休眠,根据延迟时间n返回缩进量
def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10
    
def main():
    display('Script starting.')
    # 不使用ThreadPoolExecutor的上下文管理器机制，而是直接调用map启动ThreadPoolExecutor
    executor = futures.ThreadPoolExecutor(max_workers=3) # 最大执行线程数
    # map函数提交任务后，ThreadPoolExecutor内部立刻进行排期创建Future对象，延迟时间为0的future会立刻执行
    results = executor.map(loiter, range(5))
    display('results:', results)
    display('Waiting for individual results:')
    # 迭代结果生成器，迭代过程按任务提交顺序调用future.result()（阻塞方法，任务未完成则等待）
    for i, result in enumerate(results):
        display('result {}:{}'.format(i, result))
        
main()

[17:24:14] Script starting.
[17:24:14] loiter(0): doing nothing for 0s...
[17:24:14] loiter(0): done.
[17:24:14] 	loiter(1): doing nothing for 1s...
[17:24:14][17:24:14][17:24:14]   			loiter(3): doing nothing for 3s...
results: <generator object Executor.map.<locals>.result_iterator at 0x000001C3116AA6D0>
[17:24:14] Waiting for individual results:
[17:24:14] result 0:0
		loiter(2): doing nothing for 2s...
[17:24:15] 	loiter(1): done.
[17:24:15] 				loiter(4): doing nothing for 4s...
[17:24:15] result 1:10
[17:24:16] 		loiter(2): done.
[17:24:16] result 2:20
[17:24:17] 			loiter(3): done.
[17:24:17] result 3:30
[17:24:19] 				loiter(4): done.
[17:24:19] result 4:40


上述执行结果中，loiter(0)立刻启动并执行完毕，同时loiter(1)、loiter(2)开始执行，同时由于loiter(0)的结束，loiter(3)开始执行，这些是同时发生的。
## 使用concurrent.futures.as_completed获取结果
### 比较
1.concurrent.futures.Executor子类的map方法虽然能很方便的执行多线程任务，但__只能处理一种任务（只能指定一个可执行对象）不够灵活。另外一点是，map返回的结果（通过迭代生成器）是顺序的，不管后提交的任务是否先执行完毕，都要等待第一个任务的完成（等待future.result()返回）。__

2.concurrent.futures.as_completed+executor.submit()的方式更灵活，而且迭代concurrent.futures.as_completed返回的迭代器时，__进行迭代时会优先返回已经执行完的future对象__。而且__concurrent.futures.as_completed可接受多个concurrent.futures.Executor子类实例submit()返回的future对象，甚至可来自于concurrent.futures.ProcessPoolExecutor__。

#### 相较于concurrent.futures.Executor子类的map方法，concurrent.futures.as_completed+executor.submit()的方式更加灵活。
### 示例

In [15]:
from flags2_common import main, HTTPStatus
from collections import namedtuple, Counter
import tqdm

Result = namedtuple('Result', 'status cc')
DFAULT_CONCUR_REQ = 30 # 默认并发数
MAX_CONCUR_REQ = 1000 # 最大并发数

# 重新定义download_one，增加了处理异常的部分
def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc: # 处理HTTPError异常
        res = exc.response
        if res.status_code == 404: # 只处理404
            status = HTTPStatus.not_found
            msg = 'not found'
        else:
            raise
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose:
        print(cc, msg)
    
    return Result(status, cc)

# 使用concurrent.futures.as_completed+executor.submit()实现多线程
def download_many(cc_list, base_url, verbose, concur_req):
    counter = Counter()
    # 使用上下文管理器，这里Executor的__exit__会调用自己的shutdown(wait=True)阻塞等待全部线程执行完
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            # 重要1：executor.submit()提交任务并得到对应的Future对象。（第一个被提交的任务显然会立即执行，与executor.map方法一致）
            future = executor.submit(download_one, cc, base_url, verbose)
            # 这里用future做key，因为后续会先获取执行完毕的future对象，然后去找其对应国家。
            cc_to_map[future] = cc
        # 重要2：利用futures.as_completed()方法获取future的结果，这里得到迭代器。
        done_iter = futures.as_completed(to_do_map)
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, len(cc_list)) # done_iter无len函数，需显示设定迭代器长度。
        # 重要3：迭代器优先返回已执行完的future对象，如果没有则阻塞等待。对返回结果进一步处理。
        for future in done_iter:
            # 处理线程抛出的异常
            try:
                result = future.result() # 阻塞，不过从done_iter得到的future一定是执行完毕的
            except requests.exceptions.HTTPError as exc:
                error_msg = 'HTTP {res.status_code} - {res.reson}'
                error_msg = error_msg.format(res = exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = ''
                status = res.status
                
            if error_msg:
                status = HTTPStatus.error
            Counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future] # 重要4：利用future map找到信息，虽然future的产出乱序，但仍能找到对应信息来处理
                print('*** Error for {}: {}'.format(cc, error_msg))
            
        return counter

## concurrent.futures.ProcessPoolExecutor启动进程
它与另外一个concurrent.futures.Executor子类ThreadPoolExecutor的区别在于，ProcessPoolExecutor的max_workers参数可选，默认值时os.cpu_count()返回的数量。另外ProcessPoolExecutor可实现真正的多进程并行计算以绕过GIL的限制，适合处理CPU密集型任务。