## 线程和协程

### 线程

这个一个多线程的显示脚本，一共两个线程，主线程负责显示最后的输出，开辟的子线程负责显示转圈圈。

In [3]:
import threading 
import itertools 
import time 
import sys 

class Signal: 
    go = True 
    
def spin(msg, signal): 
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'): 
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status)) 
    
def slow_function():
    # 假装等待I/O一段时间
    time.sleep(3) 
    return 42 

def supervisor():
    signal = Signal()
    # 设置一个从属线程
    spinner = threading.Thread(target=spin,args=('thinking!', signal))
    print('spinner object:', spinner)
    # 启动从属线程
    spinner.start()
    # 运行slow_function函数，阻塞主线程，从而使得从属线程得以运行
    result = slow_function()
    signal.go = False
    # 等待从属线程结束
    spinner.join()
    return result 

def main():
    result = supervisor()
    print('Answer:', result) 

In [4]:
main()

spinner object: <Thread(Thread-4, initial)>
| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking          Answer: 42


ython 没有提供终止线程的 API，这是有意为之的。若想关闭线 程，必须给线程发送消息。

### 协程

协程版本，最后的协程`supervisor()`里面一共有两个任务，通过`await asyncio.sleep()`切换协程的工作。

从Python3.5开始协程用`async 与await` 代替了`@asyncio.coroutine`与`yield.from`

In [5]:
import asyncio 
import itertools 
import sys 

async def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        # 继续回到初始位置
        write('\x08' * len(status))
        try:
            await asyncio.sleep(.1)
        # 捕获错误
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))
    
async def slow_function():
    await asyncio.sleep(3)
    return 42

async def supervisor():
    # 把协程包装成为一个task任务
    spinner = asyncio.ensure_future(spin('thinking!'))
    # 已经是一个任务的，返回的还是一个任务
    spinner = asyncio.ensure_future(spinner)
    print('spinner object:', spinner)
    # 激活等待slow_function任务，由于slow_function里面有sleep，到时候会把控制权转给spinnner
    # 因为Python在遇到time.sleep()时，会解锁GIL
    slow_function1 = asyncio.ensure_future(slow_function())
    result = await slow_function1
    # 得到result，取消spinner任务
    # await asyncio.sleep(5)
    spinner.cancel()
    return result
 
def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer', result)

In [6]:
main()

RuntimeError: This event loop is already running

spinner object: <Task pending coro=<spin() running at <ipython-input-5-979fa14abc2d>:5>>
| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking          

`asyncio.ensure_future（...）`接收的是一个协程，排定它的运行时间，然后返回个`asyncio.Task`实例，也就是`asyncio.Future`的实例，因为`Task`是`Future`的子类，用于包装协程。

`task`或者`future`都有`.done()`, `.add_done_callback(....)`和`.result()`等方法，只不过这些方法一般用的比较少，只要`result=await myfuture()`，其中`await`后面需要回调的参数就是,`result`就是`task`的`result`。

无需调用m`y_future.add_done_callback(...)`，因为可以直接把想在`future`运行结束后执行的操作放在协程中 `await my_futre`表达式的后面。这个是协程的一大优势：协程是可以暂停和恢复函数的。

无需调用`my_future.result()`,因为`await` 从 `future`中产出的值就是结果。（列如，`result = await my_future`）。

**在一个`asyncio`中，基本的流程是一样的：在一个单线程程序中使用主循环一次激活队列里的协程。各个协程向前执行几步，然后把控制权让给主循环，主循环再激活队列里的下一个协程。**

协程不能直接调用，即不能像调用普通函数那样来启动回调链。我们必须使用事件循环显式排定协程的执行时间，或者在其他排定了执行时间的协程中使用 await 表达式把它激活。

In [None]:
import asyncio
import aiohttp

async def get_flag(cc):
    url = f'{BASE_URL}/{cc}/{cc.lower()}.gif'
    async with aiohttp.ClientSeeeion() as sess:
        # 运行底层库函数session.get(url)
        async with sess.get(url) as resp:
            # resp.read()是一个协程，必须使用await语句获取响应内容
            image = await resp.read()
    return image

async def download_one(cc):
    # 管道传递给下一个协程函数
    img = await get_flag(cc)
    show(cc)
    save_flag(img, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    # 创建事件循环
    loop = asyncio.get_event_loop()
    to_do = [download_one(cc) for cc in soretd(cc_list)]
    # asyncio.wait()函数不是阻塞性函数，wait是一个协程，等传给它的所有协程运行完毕后结束
    wait_coro = asyncio.wait(to_do)
    # 执行事件循环，运行wait_coro里面的task，事件执行过程中，这个脚本会在这里阻塞
    res, _ = loop.run_until_complent(wait_coro)
    # 关闭事件循环
    loop.close()
    return len(res)

尽管线程版和 asyncio 版 HTTP 客户端的下载总时间相差无几， 但是 asyncio 版发送请求的速度更快.

**异步操作是交叉执行的。**

在 `download_many` 函数中调用 `loop.run_until_complete` 方法时，事件循环驱动各个 `download_one` 协程.各个 `download_one` 协程运行到第一个 `await` 表达式处，那个表达式又驱动各个 `get_flag` 协程，运行到第一个 `await` 表达式处， 调用 `aiohttp.request(...)` 函数。这些调用都不会阻塞，因此在零点几秒内所有请求全部开始。

`asyncio` 的基础设施获得第一个响应后，事件循环把响应发给等待结果的 `get_flag` 协程。得到响应后，`get_flag` 向前执行到下一个 `await` 表达式处，调用 `resp.read()` 方法，然后把控制权还给主循环。 其他响应会陆续返回（因为请求几乎同时发出）。**所有 `get_ flag` 协程都获得结果后**，委派生成器 `download_one` 恢复，保存图像文件。

`loop.run_until_complete` 方法的参数是一个期物或协程。如果是协 程，`run_until_complete` 方法与 `wait` 函数一样，把协程包装进一个 `Task` 对象中。

每次请求时，`download_many`函数会创建一个`download_one`协程对象。这些协程对象先使用`asyncio.wait`协程包装，然后由`loop.run_until_complete`方法驱动。

`await foo`句法能防止阻塞，是因为当前协程（即包含`await`代码的委派生成器）暂停后，控制权回到事件循环手中，再去驱动其它协程；`foo`期物或协程运行完毕后，把结果返回给暂停的协程，将其恢复。

使用 `asyncio` 包时，我们编写的代码不通过调用 `next(...)` 函数或 `.send(...)` 方法驱动协程——这一点由 `asyncio` 包实现的事件循环去做。 

概括起来就是：使用 asyncio 包时，我们编写的异步代码中包含由 asyncio 本身驱动的协程（即委派生成器），而生成器最终把职责委托 给 asyncio 包或第三方库（如 aiohttp）中的协程。这种处理方式相 当于架起了管道，让 asyncio 事件循环（通过我们编写的协程）驱动 执行低层异步 I/O 操作的库函数。

## 避免阻塞型调用 

有两种方法能避免阻塞型调用中止整个应用程序的进程： 

+ 在单独的线程中运行各个阻塞型操作 
+ 把每个阻塞型操作转换成非阻塞的异步调用使用。

多个线程是可以的，但是各个操作系统线程（Python 使用的是这种线 程）消耗的内存达兆字节（具体的量取决于操作系统种类）。如果要处理几千个连接，而每个连接都使用一个线程的话，我们负担不起。

为了降低内存的消耗，通常使用回调来实现异步调用。

把生成器当作协程使用是异步编程的另一种方式。对事件循环来说，调用回调与在暂停的协程上调用 `.send()` 方法效果差不多。各个暂停的协程是要消耗内存，但是比线程消耗的内存数量级小。而且，协程能避免可怕的“回调地狱”。

使用 asyncio 包的程序中只有一个主线程，而在这个线程中不能有阻塞型调用，因为事件循环也在这个线程中运行。

可以接收错误，并且在保存图片的时候，也采取了异步的措施.

In [8]:
import asyncio
import collections
import aiohttp
from aiohttp import web
import tqdm

In [9]:
DEFAULT_CONCUR_REQ = 5 
MAX_CONCUR_REQ = 1000

In [None]:
class FetchError(Exception):
    def __init__(self, country_code):
        # 继承父类的args添加属性,python cookbook书中介绍最好这样写
        super(FetchError, self).__init__(country_code)
        self.country_code = country_code
        
        
async def get_flag(base_url, cc):
    url = f'{base_url}/{cc}/{cc.lower()}.gif'
    async with aiohttp.ClientSession() as sess:
        async with sess.get(url) as resp:
            # 正常响应
            if resp.status == 200:
                img = await resp.read()
                return img
            # 404错误
            elif resp.status == 404:
                raise web.HTTPNotFound
            # 其他错误
            else:
                raise aiohttp.ClientHttpProxyError(code=resp.status, 
                                                  message=resp.reason,
                                                  headers=resp.headers)
                
                
async def download_one(cc, base_url, semaphore, verbose):
    try:
        # 这段代码保证，任何时候都不会有超过 concur_req 个 get_flag 协程启动。 
        with (await semaphore):
            img = await get_flag(base_url, cc)
    # 404
    except web.HTTPNotFound:
        # 用Enum关系表达式保存status
        status = HTTPStatus.not_found
        msg = 'not found'
    # 其它引起的报错为基础错误Exception，上浮给调用者
    except Exception as exc:
        # download_one 函数抛出的各个异常都包装在 FetchError 对象里， 并且链接原来的异常;raise X from Y 句法链接原来的异常。 
        raise FetchError(cc) from exc
    # 正常情况下，保存图片
    else:
        loop = asyncio.get_event_loop()
        # run_in_executor方法的第一个参数是Executor实例；如果设为None，使用事件循环的默认为ThreadPoolExecutor
        # 余下的参数是可调用的对象，以及可调用对象的位置参数。
        loop.run_in_executor(None, save_flag, img, cc.lower()+'.gif')
        status = HTTPStatus.ok
        msg = 'ok'
    if verbose and msg:
        print(cc, msg)
    return Result(status, cc)


async def download_coro(cc_list, base_url, verbose, concur_rep):
    counter = collections.Counter()
    # Semaphore 类是同步装置，用于限制并发请求数量。 
    semaphore = asyncio.Semaphore(concur_rep)
    # 多次调用 download_one 协程，创建一个协程对象列表
    to_do = [download_one(cc, base_url, semaphore, verbose) for cc in cc_list]
    #  as_completed 函数必须在协程中调用
    to_do_iter = asyncio.as_completed(to_do)
    
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
        for future in to_do_iter:
            try:
                # 迭代运行结束的期物；获取 asyncio.Future 对象的结果，最简单的方法是使用 await，而不是调用 future.result()
                res = await future
            # 因为download_one函数的所有异常都被包装在FetchError对象里
            except FetchError as exc:
                country_code = exc.country_code
                 # 尝试获取引起错误的实例的参数args
                try:
                    # 尝试从原来的异常（__cause__)中获取错误消息
                    error_msg = exc.__cause__.args[0]
                # 如果由于取不到索引而引起错误的实例的类的名字
                except IndexError:
                    # 如果在原来的异常中找不到错误消息，使用所链接异常的类名作为错误消息
                    error_msg = sec.__cause__.__class__.__name
                if verbose and error_msg:
                    msg = '*** Error for {}: {}'
                    print(msg.format(country_code, error_msg))
                # 其他的错误类型用Enum的最后一种关系表达式
                status = HTTPStatus.error
            else:
                status = res.status
            # 统计各种状态
            counter[status] += 1
        return counter
    

def download_many(cc_list, base_url, verbose, concur_rep):
    """
    download_many 函数只是实例化 downloader_coro 协程，然后通过 run_until_complete 方法把它传给事件循环。
    现在，download_many 函数只用于设置事件循环，并把 downloader_coro 协程传给 loop.run_until_complete 方法，调度 downloader_coro。 
    """
    loop = asyncio.get_event_loop()
    coro = download_coro(cc_list, base_url, verbose, concur_rep)
    counts = loop.run_until_complete(coro)
    loop.close()
    return counts

为什么将`save_flag` 函数包装在`run_in_executor()`中？

`save_flag` 函数会执行硬盘 I/O 操作，而这应该异步执行。`save_flag` 函数阻塞了客户代码与 `asyncio` 事件循环共用的唯一线程，因此保存文件时，整个应用程序都会冻结。阻塞型 I/O 调用在背后会释放 GIL，因此另一个线程可以继续。

`asyncio` 的事件循环在背后维护着一个 `ThreadPoolExecutor` 对象， 我们可以调用 `run_in_executor` 方法，把可调用的对象发给它执行。