# 协程 & asyncio & 异步编程

- 视频：https://www.bilibili.com/video/BV1cK4y1E77y
- 讲师：武沛奇

为什么要讲？

- 越来越多的学生都来问async异步相关问题，并且这一部分的知识点不太容易学习（异步非阻塞、asyncio）
- 异步相关话题和框架越来越多，例如：tornado、fastapi、django 3.x asgi、aiohttp都在异步 -> 提升性能。

如何讲解？

- 第一部分：协程。
- 第二部分：asyncio模块进行异步编程。
- 第三部分：实战案例。

## 1. 协程

协程不是计算机提供，程序员人为创造。
协程（Coroutine），也可以被称为微线程，是一种用户态内的上下文切换技术。简而言之，其实就是通过一个线程实现代码块相互切换执行。例如：

```Python
def func1():
    print(1)
    ...
    print(2)

def func2():
    print(3)
    ...
    print(4)

func1()
func2()
```

实现协程有这么几种方法：

- greenlet，早期模块。
- yield关键字。
- asyncio装饰器（py3.4）
- async、await关键字（py3.5）【推荐】

### 1.1greenlet实现协程

`pip3 install greenlet`

In [None]:
from greenlet import greenlet


def func1():
    print(1)  # 第1步：输出1
    gr2.switch()  # 第3步：切换到func2函数
    print(2)  # 第6步：输出2
    gr2.switch()  # 第7步：切换到func2函数，从上一次执行的位置继续向后执行


def func2():
    print(3)  # 第4步：输出3
    gr1.switch()  # 第5步：切换到func1函数，从上一次执行的位置继续向后执行
    print(4)  # 第8步；输出4


gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch()  # 第1步：去执行func1函数

### 1.2 yield 关键字

In [None]:
def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()

for item in f1:
    print(item)

### 1.3 asyncio

在 Python3.4 之后的版本

In [None]:
import asyncio


@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到I0耗时操作，自动化切换到tasks中的其他任务
    print(2)


@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)  # 遇到Io耗时操作，自动化切换到tasks中的其他任务
    print(4)


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

注意：遇到 IO 阻塞自动切换

### 1.4 async & await 关键字

在 Python3.5 之后的版本

In [None]:
import asyncio


async def func1():
    print(1)
    # 遇到 IO 耗时操作，自动化切换到 tasks 中的其他任务
    await asyncio.sleep(2)
    print(2)


async def func2():
    print(3)
    # 遇到 IO 耗时操作，自动化切换到 tasks 中的其他任务
    await asyncio.sleep(2)
    print(4)


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

## 2. 协程意义

在一个线程中如果遇到 IO 等待时间，线程不会傻等，利用空闲时间去完成其他事件。

案例：去下载三张图片（网络 IO）

- 普通方式

In [None]:
import requests


def download_image(url):
    print("开始下载:", url)
    # 发送网络请求, 下载图片
    response = requests.get(url)
    print("下载完成")
    # 图片保存到本地文件
    filename = url.rsplit("/")[-1]
    with open(filename, mode='wb') as file_object:
        file_object.write(response.content)


if __name__ == '__main__':
    url_list = [
        'https://wx3.sinaimg.cn/mw690/5cfc088ely1gq1gvkbg53j20u00u0juu.jpg',
        'https://wx3.sinaimg.cn/mw690/5cfc088ely1gq1gvkbfokj20jd0jdta3.jpg',
        'https://wx4.sinaimg.cn/mw690/5cfc088ely1gq1gvkblcaj20mq0sejta.jpg'
    ]
    for item in url_list:
        download_image(item)

- 协程方式

In [None]:
import aiohttp
import asyncio


async def fetch(session, url):
    print("send request:", url)
    async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        filename = url.rsplit("/")[-1]
        with open(filename, mode='wb') as file_object:
            file_object.write(content)


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://wx3.sinaimg.cn/mw690/5cfc088ely1gq1gvkbg53j20u00u0juu.jpg',
            'https://wx3.sinaimg.cn/mw690/5cfc088ely1gq1gvkbfokj20jd0jdta3.jpg',
            'https://wx4.sinaimg.cn/mw690/5cfc088ely1gq1gvkblcaj20mq0sejta.jpg'
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)

if __name__ == "__main__":
    # 教程给出的这行代码会报错，换成下面两行就好了
    # asyncio.run(main())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

## 3. 异步编程

### 3.1 事件循环

理解成一个死循环，去检测并执行某些代码

```
任务列表 = [任务1, 任务2, 任务3, 任务4]

while True:
    可执行的任务列表，已完成的任务列表 = 去任务列表中检查所有的任务，将'可执行'和'已完成'的任务返回
    
    for 就绪任务 in 可执行的任务列表：
        执行已就绪的任务

    for 已完成的任务 in 已完成的任务列表：
        在任务列表中移除已完成的任务

    如果任务列表中的任务都已完成，则终止循环
```

```
import asyncio

# 生成或获取一个事件循环
loop = asyncio.get_event_loop()

# 将任务放到 `任务列表`
loop.run_until_complete(任务)
```

### 3.2 快速上手

协程函数，定义函数为 `async def func()` 

写成对象，执行 协程函数() 得到的协程对象

```
async def func():
    pass

result = func()
```

注意：执行协程函数创建写成对象，函数内部代码并不会执行

如果想要运行协程函数内部代码，必须要将协程对象交给事件循环来处理

In [None]:
import asyncio

async def func():
    print(1)

result = func()
# loop = asyncio.get_event_loop()
# loop.run_until_complete(result)
asyncio.run(result) # python 3.7 之后

### 3.3 await 关键字

await + 可等待对象（协程对象、Future、 Task 对象 -> 可理解为 IO 等待）

In [None]:
import asyncio

async def func():
    print(1)
    r = await asyncio.sleep(2)
    print("结束", r)

asyncio.run(func())

# Output:
# 1
# (等待 2 秒)
# 结束 None

In [None]:
import asyncio


async def others():
    print("start")
    r = await asyncio.sleep(2)
    print("end")
    return "ret"


async def func():
    print("inner")
    # 遇到 IO 操作挂起当前协程（任务），等 IO 操作完成之后再继续往下执行。
    # 当前协程挂起时，事件循环可以去执行其他协程（任务）。
    r = await others()
    print("结束", r)

asyncio.run(func())

# Output:
# inner
# start
# (等待 2 秒)
# end
# 结束 ret

In [None]:
import asyncio


async def others():
    print("start")
    r = await asyncio.sleep(2)
    print("end")
    return "ret"


async def func():
    print("inner")
    r = await others()
    print("结束", r)

    r = await others()
    print("结束", r)

asyncio.run(func())

# inner
# start
# (等待 2 秒)
# end
# 结束 ret
# start
# (等待 2 秒)
# end
# 结束 ret

await 就是等待对象的值得到结果之后再继续往下走

### 3.4 Task 对象

> *Tasks* are used to schedule coroutines concurrenty.
>
> When a coroutine is wrapped into a *Tasks* with functions like `asyncio.create _task()` the coroutine is automatically scheduled to run soon.

Tasks 用于并发调度协程，通过 `asyncio.create_task(协程对象)` 的方式创建 Task 对象，这样可以让协程加入事件循环中等待被调度执行。除了使用 `asyncio.create_task()` 函数以外，还可以用低层级的 `loop.create_task()` 或 `ensure_future()` 函数。不建议手动实例化 Task 对象。

In [None]:
import asyncio


async def func():
    print(1)
    r = await asyncio.sleep(2)
    print(2)
    return 3


async def main():
    print("start")
    # 创建Task对象，将当前执行func函数任务添加到事件循环。
    task1 = asyncio.create_task(func())
    # 创建Task对象，将当前执行func函数任务添加到事件循环。
    task2 = asyncio.create_task(func())
    print("end")
    # 当执行某协程遇到IO操作时，会自动化切换执行其他任务。
    #此处的await是等待相对应的协程全都执行完毕并获取结果
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)

asyncio.run(main())

# start
# end
# 1
# 1
# (等待 2 秒)
# 2
# 2
# 3 3

In [None]:
import asyncio


async def func():
    print(1)
    r = await asyncio.sleep(2)
    print(2)
    return 3


async def main():
    print("start")

    task_list = [
        asyncio.create_task(func(), name="n1"),
        asyncio.create_task(func(), name="n2")
    ]
    print("end")

    done, pending = await asyncio.wait(task_list)
    print(done)
    # done, pending = await asyncio.wait(task_list, timeout=1)
    # print(pending)

asyncio.run(main())

# start
# end
# 1
# 1
# 2
# 2
# {<Task finished name='n2' coro=<func() done, defined at foo.py:4> result=3>, 
#  <Task finished name='n1' coro=<func() done, defined at foo.py:4> result=3>}

In [None]:
import asyncio


async def func():
    print(1)
    r = await asyncio.sleep(2)
    print(2)
    return 3


# 当写到最顶层时，这样写会报错
# 因为 create_task 会立即将 func() 加入事件循环
# 而此时还未创建事件循环
# task_list = [
#     asyncio.create_task(func(), name="n1"),
#     asyncio.create_task(func(), name="n2")
# ]
task_list = [
    func(),
    func()
]

done, pending = asyncio.run(asyncio.wait(task_list))
print(done)

### 3.5 Future 对象

更底层，Task 的基类，Task 对象内部 await 结果的处理基于 Future 对象来的

In [None]:
async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 创建一个任务（Future对象），这个任务什么都不干。
    fut = loop.create_future()
    # 等待任务最终结果（Future对象），没有结果则会一直等下去。
    await fut

asyncio.run(main())

In [None]:
import asyncio


async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result("666")


async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # 创建一个任务(Future对象)，没绑定任何行为，则这个任务永远不知道什么时候结束。

    fut = loop.create_future()
    # 创建一个任务(Task对象)，绑定了set_after函数，函数内部在2s之后，会给fu赋值。
    # 即手动设置future任务的最终结果，那么fut就可以结束了。
    await loop.createtask(set_after(fut))

    # 等待 Future对象获取最终结果，否则一直等下去
    data = await fut
    print(data)

asyncio.run(main)

### 3.5 concurrency.futures.Future 对象

使用使用线程池、进程池实现异步操作时用到的对象

In [None]:
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor


def func(value):
    time.sleep(1)
    print(value)
    return 123


# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool=ProcessPoolExecutor(max_workers=5)

for i in range(10):
    fut = pool.submit(func, i)
    print(fut)

以后写代码可能会存在交叉时间。例如：crm 项目 80% 都是基于协程异步编程+MySQL（不支持）【线程、进程做异步编程】。

In [None]:
import time
import asyncio
import concurrent.futures


def func1():
    # 某个耗时操作
    time.sleep(2)
    return "ret"


async def main():
    loop = asyncio.get_running_loop()

    # 1.Run in the default loop's executor (默认 ThreadPoolExecutor)
    # 第一步：内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行 func1 函数
    # 并返回一个 concurrent.futures.Future 对象
    # 第二步：调用 asyncio.wrap_future 将 concurrent.futures.Future 对象包装为 asycio.Future 对象。
    # 因为 concurrent.futures.Future 对象不支持 await 语法，所以需要包装为 asycio.Future 对象才能使用。
    fut = loop.run_in_executor(None, func1)
    result = await fut
    print('default thread pool', result)

    # 2.Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor() as pool:
    #     result=await loop.run_in_executor(pool, func1)
    #     print('custom thread pool', result)

    # 3.Run in a custom process pool:
    # with concurrent.futures.ProcessPoolExecutor() as pool:
    #     result=await loop.run_in_executor(pool, func1)
    #     print('custom process pool', result)
asyncio.run(main())

案例：async+不支持异步的模块

In [None]:
import asyncio
import requests


async def download_image(ur1):
    # 发送网络请求, 下载图片(遇到网络下载图片的IO请求, 自动化切换到其他任务)
    print("start", url)

    loop = asyncio.get_event_loop()
    # requests 模块默认不支持异步操作, 所以就使用线程池来配合实现了。
    future = loop.run_in_executor(None, requests.get, ur1)
    response = await future
    print('下载完成')
    # 图片保存到本地文件
    file_name = url.rsplit('/')[-1]
    with open(file_name, mode='wb')as file_object:
        file_object.write(response.content)

if __name__ == '__main_':
    url_list = [

    ]
    tasks = [download_image(url) for url in url_list]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

### 3.6 异步迭代器

什么是异步迭代器

实现了 `__aiter__()` 和 `__anext__()` 方法的对象。`__anext__()` 必须返回一个 awaitable 对象。async for 会处理异步迭代器的 `__anext__()` 方法所返回的可等待对象，直到其引发一个 stopAsyncrteration 异常。由 PEP 492 引入。

什么是异步可迭代对象？

可在 asyncfor 语句中被使用的对象。必须通过它的 `__aiter__()` 方法返回一个 asynchronous iterator 。由PEP 492引入。

In [None]:
import asyncio


class Reader(object):
    """自定义异步选代器(同时也是异步可选代对象)"""

    def __init__(self):
        self.count = 0

    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise stopAsyncIteration
        return val

async def func():
    obj = Reader()
    async for item in obj:
        print(obj)

asyncio.run(func())

### 3.8 异步上下文管理器

此种对象通过定义 `__aenter__()` 和 `__aexit__()` 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。

In [None]:
import asyncio


class AsynccontextManager:
    def __init__(self):
        self.conn = conn


async def do_something(se1f):
    # 异步操作数据库
    return 666


async def __aenter__(self):
    # 异步链接数据库
    self.conn = await asyncio.sleep(1)
    return self


async def __aexit__(self, exc_type, exc, tb):
    # 异步关闭数据库链接
    await asyncio.sleep(1)


async def func():
    async with AsynccontextManager() as f:
        result = await f.do_something()
        print(result)

asyncio.run(func())

## 4. uvloop

是 async 的事件循环的替代方案，事件循环效率 > 默认 async 的事件循环

`pip install uvloop`

```
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 编写asyncio的代码，与之前写的代码一致。

 #内部的事件循环自动化会变为 uvloop
 asyncio.run(...)
```

注意：asgi -> uvicorn 内部使用的就是 uvloop

## 5. 实战案例

### 5.4 爬虫

In [None]:
import aiohttp
import asyncio


async def fetch(session, url):
    print("发送请求:", url)
    async with session.get(url, verify_ssl=False) as response:
        text = await response.text()
        print("得到结果:", url, len(text))
        return text


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://python.org',
            'https://www.baidu.com',
            'https://www.pythonav.com'
        ]

        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        done, pending = await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())