# asyncio 并发编程

<b style="color: red"> 适合 asyncio API 的协程在定义体中必须使用 `yield from` ，而不能用 `yield` 。</b>

## `@asyncio.coroutine`

交给 asyncio 处理的协程要使用 `@asyncio.coroutine` 装饰，这虽不是强制要求，但是建议这么做。因为这样能在一众普通函数中把协程凸显出来，也有助于调试：如果还没从协程中产出值，协程就被垃圾回收了，可以发出警告。


## asyncio 基本思想 (面向事件编程)

在 asyncio 中，基本的流程和 **使用协程进行离散事件仿真** 中提到的是一样的：在一个单线程中使用主循环依次激活队列里的协程。
各个协程向前执行几步，然后把控制权让给主循环，主循环再激活队列里的下一个协程。

编写基于 asyncio 的程序是要注意下述细节：

- 编写的协程链始终通过把最外层委派生成器传给 asyncio 包中的某个函数驱动，例如 `loop.run_until_complete()` 。即我们的代码不通过调用 next() 函数或 send() 方法驱动协程。驱动由 asyncio 包实现的事件循环去做
- 编写的协程链最终通过 yield from 把职责委托给 asyncio 包中的某个协程函数，如 `yeild from asyncio.sleep()` ，或者其他库中实现高层协议的协程，如 `response = yield from aiohttp.request('GET', url)` 。也就是说，最内层的子生成器是库中真正执行 I/O 操作的函数，而不是我们自己编写的函数

概括起来就是：使用 asyncio 包时，我们编写的代码中包含委派生成器，而生成器最终把职责委托给 asyncio 包或第三方库中的协程。
这种处理方式相当于架起了管道，让 asyncio 事件循环驱动执行低层异步 I/O 操作的库函数。

## `asyncio.Future`

在 asyncio 包中，`BaseEventLoop.create_task()` 方法接收一个协程，排定它的运行时间，然后返回一个 `asyncio.Task` 实例，也是 `asyncio.Future` 类的实例，因为前者是后者的子类，用于包装协程。

`asyncio.Future` 类的目的是与 yield from 一起使用，通常不需要使用以下方法：

- 无需调用 `asyncio.Future.add_done_callback()` ，因为可以直接把在 Future 运行结束后执行的操作放在 yield from 表达式后面
- 无需调用 `asyncio.Future.result()` ，因为 yield from 从 Future 对象中产出的值就是结果，例如：`result = yield from my_future`

### 从 Future ，Task，和协程中产出值

在 asyncio 包中，可以这样写：`result = yield from foo()` ，其中 foo 可以是协程函数，或者是返回 asyncio.Future 或 Task 实例的普通函数，<b style="color: red">这是 asyncio 包的 API 中很多地方可以互换协程和 Future 对象的原因之一</b>。

获取 Task 对象有两种主要方式：

- `asyncio.async(coro_or_future, *, loop=None)`

  这个函数排定了协程的运行时间并统一了协程和 Future ：如果第一个参数是 Future 或 Task 对象，则原封不动地返回；如果是协程，则会调用 `loop.create_task()` 方法创建 Task 对象。loop 关键词参数是可选的，用于传入事件循环，如果没有传入，则将调用 `asyncio.get_event_loop()` 获取
  
  
- `BaseEventLoop.create_task(coro)`

  这个方法排定了协程的执行时间，返回 Task 对象

### 协程和 Future 测试脚本

In [1]:
import asyncio
import time

def run_sync(coro_or_future):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coro_or_future)

async def test_coro():
    time0 = time.time()
    await asyncio.sleep(3)
    time_delta = time.time() - time0
    return time_delta

run_sync(test_coro())

3.0035078525543213

## 常用 API

### `BaseEventLoop.run_in_executor(executor, func, *args)`

asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象，可以调用 `run_in_executor` 方法，把可调用对象发给它执行。

第一个参数是 Executor 实例，如果为 None ，则使用默认的 ThreadPoolExecutor 实例。

In [None]:
loop = asyncio.get_event_loop()
loop.run_in_executor(None, )

### `asyncio.as_complete(fs, *, loop=None, timeout=None)`

In [2]:
import asyncio
import time

async def foo(seconds):
    await asyncio.sleep(seconds)
    return seconds

async def coro():
    fs = [foo(10), foo(5), foo(1)]
    for f in asyncio.as_completed(fs):
        time0 = time.time()
        result = await f
        print(result, "delta", time.time() - time0)

asyncio.get_event_loop().run_until_complete(coro())

1 delta 1.0015299320220947
5 delta 4.003159999847412
10 delta 4.999229192733765


### `asyncio.Semaphore(value=1, *, loop=None)`

Semaphore 类用于限制并发请求数量。Semaphore 对象维护一个内部计数器，如果在对象上调用 `acquire()` 方法，计数器递减；如果调用 `release()` 方法，计数器递增。可以把 Semaphore 对象当作上下文管理器使用。

In [3]:
time0 = time.time()

async def foo(semaphore):
    with (await semaphore):        
        await asyncio.sleep(2)
        print("time delta:", time.time() - time0)

async def coro():
    semaphore = asyncio.Semaphore(3)
    fs = [foo(semaphore) for _ in range(5)]
    for f in asyncio.as_completed(fs):
        await f

asyncio.get_event_loop().run_until_complete(coro())

time delta: 2.003462076187134
time delta: 2.0037269592285156
time delta: 2.0046191215515137
time delta: 4.005551099777222
time delta: 4.005756139755249


### `asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)`

参数是一个由 Future 或协程构成的可迭代对象，wait 会分别把各个协程包装进一个 Task 对象。
wait 是协程函数，因此**它不会阻塞**，默认行为是等传给它的所有协程运行完毕后结束。

In [4]:
import random
async def foo():
    sec = random.randint(1, 10)
    await asyncio.sleep(sec)
    return sec

to_do = [foo() for _ in range(10)]
wait_coro = asyncio.wait(to_do)
asyncio.get_event_loop().run_until_complete(wait_coro)

({<Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=1>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=1>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=4>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=4>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=4>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=6>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=6>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=6>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=7>,
  <Task finished coro=<foo() done, defined at <ipython-input-4-f7dea6d843fa>:2> result=7>},
 set())